swarm_dial.go 13.1 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package swarm

import (
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6 7 8 9
	"errors"
	"fmt"
	"sync"
	"time"

Steven Allen's avatar
Steven Allen committed
10
	logging "github.com/ipfs/go-log"
Jeromy's avatar
Jeromy committed
11 12
	addrutil "github.com/libp2p/go-addr-util"
	lgbl "github.com/libp2p/go-libp2p-loggables"
Steven Allen's avatar
Steven Allen committed
13
	inet "github.com/libp2p/go-libp2p-net"
Jeromy's avatar
Jeromy committed
14
	peer "github.com/libp2p/go-libp2p-peer"
Steven Allen's avatar
Steven Allen committed
15
	transport "github.com/libp2p/go-libp2p-transport"
Jeromy's avatar
Jeromy committed
16
	ma "github.com/multiformats/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18 19 20 21 22 23 24 25 26 27 28 29 30
)

// Diagram of dial sync:
//
//   many callers of Dial()   synched w.  dials many addrs       results to callers
//  ----------------------\    dialsync    use earliest            /--------------
//  -----------------------\              |----------\           /----------------
//  ------------------------>------------<-------     >---------<-----------------
//  -----------------------|              \----x                 \----------------
//  ----------------------|                \-----x                \---------------
//                                         any may fail          if no addr at end
//                                                             retry dialAttempt x

var (
31 32
	// ErrDialBackoff is returned by the backoff code when a given peer has
	// been dialed too frequently
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33
	ErrDialBackoff = errors.New("dial backoff")
34 35 36 37 38 39

	// ErrDialFailed is returned when connecting to a peer has ultimately failed
	ErrDialFailed = errors.New("dial attempt failed")

	// ErrDialToSelf is returned if we attempt to dial our own peer
	ErrDialToSelf = errors.New("dial to self attempted")
Steven Allen's avatar
Steven Allen committed
40 41 42 43

	// ErrNoTransport is returned when we don't know a transport for the
	// given multiaddr.
	ErrNoTransport = errors.New("no transport for protocol")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44 45
)

Steven Allen's avatar
Steven Allen committed
46
// DialAttempts governs how many times a goroutine will try to dial a given peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47 48
// Note: this is down to one, as we have _too many dials_ atm. To add back in,
// add loop back in Dial(.)
Steven Allen's avatar
Steven Allen committed
49
const DialAttempts = 1
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50

Steven Allen's avatar
Steven Allen committed
51 52 53
// ConcurrentFdDials is the number of concurrent outbound dials over transports
// that consume file descriptors
const ConcurrentFdDials = 160
Jeromy's avatar
Jeromy committed
54

Steven Allen's avatar
Steven Allen committed
55 56 57
// DefaultPerPeerRateLimit is the number of concurrent outbound dials to make
// per peer
const DefaultPerPeerRateLimit = 8
Jeromy's avatar
Jeromy committed
58

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
// check dialbackoff. If it's there, they don't wait and exit promptly with
// an error. (the single goroutine that is actually dialing continues to
// dial). If a dial is successful, the peer is removed from backoff.
// Example:
//
//  for {
//  	if ok, wait := dialsync.Lock(p); !ok {
//  		if backoff.Backoff(p) {
//  			return errDialFailed
//  		}
//  		<-wait
//  		continue
//  	}
//  	defer dialsync.Unlock(p)
//  	c, err := actuallyDial(p)
//  	if err != nil {
//  		dialbackoff.AddBackoff(p)
//  		continue
//  	}
//  	dialbackoff.Clear(p)
//  }
//
Jeromy's avatar
Jeromy committed
84

Steven Allen's avatar
Steven Allen committed
85 86 87 88 89 90
// DialBackoff is a type for tracking peer dial backoffs.
//
// * It's safe to use it's zero value.
// * It's thread-safe.
// * It's *not* safe to move this type after using.
type DialBackoff struct {
Jeromy's avatar
Jeromy committed
91
	entries map[peer.ID]*backoffPeer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92 93 94
	lock    sync.RWMutex
}

Jeromy's avatar
Jeromy committed
95 96 97 98 99
type backoffPeer struct {
	tries int
	until time.Time
}

Steven Allen's avatar
Steven Allen committed
100
func (db *DialBackoff) init() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101
	if db.entries == nil {
Jeromy's avatar
Jeromy committed
102
		db.entries = make(map[peer.ID]*backoffPeer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
103 104 105 106
	}
}

// Backoff returns whether the client should backoff from dialing
Jeromy's avatar
Jeromy committed
107
// peer p
Steven Allen's avatar
Steven Allen committed
108
func (db *DialBackoff) Backoff(p peer.ID) (backoff bool) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
109
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
110
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111
	db.init()
Jeromy's avatar
Jeromy committed
112 113 114 115 116 117
	bp, found := db.entries[p]
	if found && time.Now().Before(bp.until) {
		return true
	}

	return false
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118 119
}

Steven Allen's avatar
Steven Allen committed
120 121 122 123 124 125 126 127
// BackoffBase is the base amount of time to backoff (default: 5s).
var BackoffBase = time.Second * 5

// BackoffCoef is the backoff coefficient (default: 1s).
var BackoffCoef = time.Second

// BackoffMax is the maximum backoff time (default: 5m).
var BackoffMax = time.Minute * 5
Jeromy's avatar
Jeromy committed
128

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130 131
// AddBackoff lets other nodes know that we've entered backoff with
// peer p, so dialers should not wait unnecessarily. We still will
// attempt to dial with one goroutine, in case we get through.
Steven Allen's avatar
Steven Allen committed
132 133 134 135 136 137 138 139
//
// Backoff is not exponential, it's quadratic and computed according to the
// following formula:
//
//     BackoffBase + BakoffCoef * PriorBackoffs^2
//
// Where PriorBackoffs is the number of previous backoffs.
func (db *DialBackoff) AddBackoff(p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
140
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
141
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
142
	db.init()
Jeromy's avatar
Jeromy committed
143 144 145 146
	bp, ok := db.entries[p]
	if !ok {
		db.entries[p] = &backoffPeer{
			tries: 1,
Steven Allen's avatar
Steven Allen committed
147
			until: time.Now().Add(BackoffBase),
Jeromy's avatar
Jeromy committed
148 149 150 151
		}
		return
	}

Steven Allen's avatar
Steven Allen committed
152 153 154
	backoffTime := BackoffBase + BackoffCoef*time.Duration(bp.tries*bp.tries)
	if backoffTime > BackoffMax {
		backoffTime = BackoffMax
Jeromy's avatar
Jeromy committed
155
	}
Steven Allen's avatar
Steven Allen committed
156
	bp.until = time.Now().Add(backoffTime)
Jeromy's avatar
Jeromy committed
157
	bp.tries++
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158 159 160 161
}

// Clear removes a backoff record. Clients should call this after a
// successful Dial.
Steven Allen's avatar
Steven Allen committed
162
func (db *DialBackoff) Clear(p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
164
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165 166 167 168
	db.init()
	delete(db.entries, p)
}

Steven Allen's avatar
Steven Allen committed
169
// DialPeer connects to a peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170 171 172 173
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
Steven Allen's avatar
Steven Allen committed
174 175 176 177 178 179 180 181 182 183 184
// etc. to achieve connection.
func (s *Swarm) DialPeer(ctx context.Context, p peer.ID) (inet.Conn, error) {
	return s.dialPeer(ctx, p)
}

// internal dial method that returns an unwrapped conn
//
// It is gated by the swarm's dial synchronization systems: dialsync and
// dialbackoff.
func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
	log.Debugf("[%s] swarm dialing peer [%s]", s.local, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185
	var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
186 187 188 189 190
	err := p.Validate()
	if err != nil {
		return nil, err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191 192 193 194 195
	if p == s.local {
		log.Event(ctx, "swarmDialSelf", logdial)
		return nil, ErrDialToSelf
	}

196
	defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
197 198

	// check if we already have an open connection first
Steven Allen's avatar
Steven Allen committed
199
	conn := s.bestConnToPeer(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
200 201 202 203
	if conn != nil {
		return conn, nil
	}

204 205
	// if this peer has been backed off, lets get out of here
	if s.backf.Backoff(p) {
206
		log.Event(ctx, "swarmDialBackoff", p)
207 208 209
		return nil, ErrDialBackoff
	}

210 211 212 213
	// apply the DialPeer timeout
	ctx, cancel := context.WithTimeout(ctx, inet.GetDialPeerTimeout(ctx))
	defer cancel()

214
	conn, err = s.dsync.DialLock(ctx, p)
Steven Allen's avatar
Steven Allen committed
215 216 217
	if err != nil {
		return nil, err
	}
218

Steven Allen's avatar
Steven Allen committed
219 220
	log.Debugf("network for %s finished dialing %s", s.local, p)
	return conn, err
221
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
222

223 224 225
// doDial is an ugly shim method to retain all the logging and backoff logic
// of the old dialsync code
func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
Steven Allen's avatar
Steven Allen committed
226 227 228 229 230 231 232 233
	// Short circuit.
	// By the time we take the dial lock, we may already *have* a connection
	// to the peer.
	c := s.bestConnToPeer(p)
	if c != nil {
		return c, nil
	}

Steven Allen's avatar
Steven Allen committed
234 235
	logdial := lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)

236 237 238
	// ok, we have been charged to dial! let's do it.
	// if it succeeds, dial will add the conn to the swarm itself.
	defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
Steven Allen's avatar
Steven Allen committed
239 240

	conn, err := s.dial(ctx, p)
241
	if err != nil {
Steven Allen's avatar
Steven Allen committed
242 243 244 245 246 247 248 249 250
		conn = s.bestConnToPeer(p)
		if conn != nil {
			// Hm? What error?
			// Could have canceled the dial because we received a
			// connection or some other random reason.
			// Just ignore the error and return the connection.
			log.Debugf("ignoring dial error because we have a connection: %s", err)
			return conn, nil
		}
251 252 253 254
		if err != context.Canceled {
			log.Event(ctx, "swarmDialBackoffAdd", logdial)
			s.backf.AddBackoff(p) // let others know to backoff
		}
255

Steven Allen's avatar
Steven Allen committed
256
		// ok, we failed.
257
		return nil, fmt.Errorf("dial attempt failed: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
258
	}
259
	return conn, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
260 261
}

Steven Allen's avatar
Steven Allen committed
262 263 264 265 266
func (s *Swarm) canDial(addr ma.Multiaddr) bool {
	t := s.TransportForDialing(addr)
	return t != nil && t.CanDial(addr)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
267 268 269 270 271 272 273 274 275 276 277
// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
	var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
	if p == s.local {
		log.Event(ctx, "swarmDialDoDialSelf", logdial)
		return nil, ErrDialToSelf
	}
	defer log.EventBegin(ctx, "swarmDialDo", logdial).Done()
	logdial["dial"] = "failure" // start off with failure. set to "success" at the end.

	sk := s.peers.PrivKey(s.local)
Matt Joiner's avatar
Matt Joiner committed
278
	logdial["encrypted"] = sk != nil // log whether this will be an encrypted dial or not.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
279 280 281 282 283
	if sk == nil {
		// fine for sk to be nil, just log.
		log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
	}

Jeromy's avatar
Jeromy committed
284 285
	//////
	/*
286
		This slice-to-chan code is temporary, the peerstore can currently provide
Jeromy's avatar
Jeromy committed
287 288 289 290 291
		a channel as an interface for receiving addresses, but more thought
		needs to be put into the execution. For now, this allows us to use
		the improved rate limiter, while maintaining the outward behaviour
		that we previously had (halting a dial when we run out of addrs)
	*/
Matt Joiner's avatar
Matt Joiner committed
292 293 294 295 296
	peerAddrs := s.peers.Addrs(p)
	if len(peerAddrs) == 0 {
		return nil, errors.New("no addresses")
	}
	goodAddrs := s.filterKnownUndialables(peerAddrs)
Matt Joiner's avatar
Matt Joiner committed
297 298 299
	if len(goodAddrs) == 0 {
		return nil, errors.New("no good addresses")
	}
300
	goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs))
301
	for _, a := range goodAddrs {
302
		goodAddrsChan <- a
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
303
	}
304
	close(goodAddrsChan)
Jeromy's avatar
Jeromy committed
305
	/////////
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
306 307

	// try to get a connection to any addr
308
	connC, err := s.dialAddrs(ctx, p, goodAddrsChan)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
309
	if err != nil {
Jeromy's avatar
Jeromy committed
310
		logdial["error"] = err.Error()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
311 312
		return nil, err
	}
Steven Allen's avatar
Steven Allen committed
313 314 315 316
	logdial["conn"] = logging.Metadata{
		"localAddr":  connC.LocalMultiaddr(),
		"remoteAddr": connC.RemoteMultiaddr(),
	}
317
	swarmC, err := s.addConn(connC, inet.DirOutbound)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
318
	if err != nil {
Jeromy's avatar
Jeromy committed
319
		logdial["error"] = err.Error()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
320 321 322 323 324 325 326 327
		connC.Close() // close the connection. didn't work out :(
		return nil, err
	}

	logdial["dial"] = "success"
	return swarmC, nil
}

328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
// filterKnownUndialables takes a list of multiaddrs, and removes those
// that we definitely don't want to dial: addresses configured to be blocked,
// IPv6 link-local addresses, addresses without a dial-capable transport,
// and addresses that we know to be our own.
// This is an optimization to avoid wasting time on dials that we know are going to fail.
func (s *Swarm) filterKnownUndialables(addrs []ma.Multiaddr) []ma.Multiaddr {
	lisAddrs, _ := s.InterfaceListenAddresses()
	var ourAddrs []ma.Multiaddr
	for _, addr := range lisAddrs {
		protos := addr.Protocols()
		// we're only sure about filtering out /ip4 and /ip6 addresses, so far
		if len(protos) == 2 && (protos[0].Code == ma.P_IP4 || protos[0].Code == ma.P_IP6) {
			ourAddrs = append(ourAddrs, addr)
		}
	}

	return addrutil.FilterAddrs(addrs,
		addrutil.SubtractFilter(ourAddrs...),
		s.canDial,
		// TODO: Consider allowing link-local addresses
		addrutil.AddrOverNonLocalIP,
		addrutil.FilterNeg(s.Filters.AddrBlocked),
	)
}

Steven Allen's avatar
Steven Allen committed
353
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (transport.Conn, error) {
Jeromy's avatar
Jeromy committed
354
	log.Debugf("%s swarm dialing %s", s.local, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
355 356 357 358

	ctx, cancel := context.WithCancel(ctx)
	defer cancel() // cancel work when we exit func

Jeromy's avatar
Jeromy committed
359 360
	// use a single response type instead of errs and conns, reduces complexity *a ton*
	respch := make(chan dialResult)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
361

Matt Joiner's avatar
Matt Joiner committed
362
	defaultDialFail := inet.ErrNoRemoteAddrs
Jeromy's avatar
Jeromy committed
363
	exitErr := defaultDialFail
Jeromy's avatar
Jeromy committed
364

365 366
	defer s.limiter.clearAllPeerDials(p)

Jeromy's avatar
Jeromy committed
367
	var active int
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391
	for remoteAddrs != nil || active > 0 {
		// Check for context cancellations and/or responses first.
		select {
		case <-ctx.Done():
			if exitErr == defaultDialFail {
				exitErr = ctx.Err()
			}
			return nil, exitErr
		case resp := <-respch:
			active--
			if resp.Err != nil {
				log.Infof("got error on dial to %s: %s", resp.Addr, resp.Err)
				// Errors are normal, lots of dials will fail
				exitErr = resp.Err
			} else if resp.Conn != nil {
				return resp.Conn, nil
			}

			// We got a result, try again from the top.
			continue
		default:
		}

		// Now, attempt to dial.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
392
		select {
Jeromy's avatar
Jeromy committed
393 394 395 396
		case addr, ok := <-remoteAddrs:
			if !ok {
				remoteAddrs = nil
				continue
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
397 398
			}

Jeromy's avatar
Jeromy committed
399 400 401 402 403
			s.limitedDial(ctx, p, addr, respch)
			active++
		case <-ctx.Done():
			if exitErr == defaultDialFail {
				exitErr = ctx.Err()
Jeromy's avatar
Jeromy committed
404
			}
Jeromy's avatar
Jeromy committed
405 406 407 408
			return nil, exitErr
		case resp := <-respch:
			active--
			if resp.Err != nil {
Marten Seemann's avatar
Marten Seemann committed
409
				log.Infof("got error on dial to %s: %s", resp.Addr, resp.Err)
Jeromy's avatar
Jeromy committed
410 411 412 413
				// Errors are normal, lots of dials will fail
				exitErr = resp.Err
			} else if resp.Conn != nil {
				return resp.Conn, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
414 415 416
			}
		}
	}
417
	return nil, exitErr
Jeromy's avatar
Jeromy committed
418 419
}

Jeromy's avatar
Jeromy committed
420 421 422
// limitedDial will start a dial to the given peer when
// it is able, respecting the various different types of rate
// limiting that occur without using extra goroutines per addr
Jeromy's avatar
Jeromy committed
423 424 425 426 427 428 429
func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) {
	s.limiter.AddDialJob(&dialJob{
		addr: a,
		peer: p,
		resp: resp,
		ctx:  ctx,
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
430 431
}

Steven Allen's avatar
Steven Allen committed
432 433 434 435 436
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.Conn, error) {
	// Just to double check. Costs nothing.
	if s.local == p {
		return nil, ErrDialToSelf
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
437 438
	log.Debugf("%s swarm dialing %s %s", s.local, p, addr)

439 440
	tpt := s.TransportForDialing(addr)
	if tpt == nil {
Steven Allen's avatar
Steven Allen committed
441
		return nil, ErrNoTransport
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
442 443
	}

444
	connC, err := tpt.Dial(ctx, addr, p)
Steven Allen's avatar
Steven Allen committed
445 446
	if err != nil {
		return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
447 448
	}

Steven Allen's avatar
Steven Allen committed
449 450
	// Trust the transport? Yeah... right.
	if connC.RemotePeer() != p {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
451
		connC.Close()
452
		err = fmt.Errorf("BUG in transport %T: tried to dial %s, dialed %s", p, connC.RemotePeer(), tpt)
Steven Allen's avatar
Steven Allen committed
453 454
		log.Error(err)
		return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
455 456 457 458 459
	}

	// success! we got one!
	return connC, nil
}