dial_test.go 12.2 KB
Newer Older
Steven Allen's avatar
Steven Allen committed
1
package swarm_test
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6 7 8 9
	"net"
	"sync"
	"testing"
	"time"

Jeromy's avatar
Jeromy committed
10 11 12
	addrutil "github.com/libp2p/go-addr-util"
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
Steven Allen's avatar
Steven Allen committed
13
	swarmt "github.com/libp2p/go-libp2p-swarm/testing"
14
	transport "github.com/libp2p/go-libp2p-transport"
Jeromy's avatar
Jeromy committed
15 16 17 18
	testutil "github.com/libp2p/go-testutil"
	ci "github.com/libp2p/go-testutil/ci"
	ma "github.com/multiformats/go-multiaddr"
	manet "github.com/multiformats/go-multiaddr-net"
Steven Allen's avatar
Steven Allen committed
19 20

	. "github.com/libp2p/go-libp2p-swarm"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21 22
)

Steven Allen's avatar
Steven Allen committed
23
func init() {
24
	transport.DialTimeout = time.Second
Steven Allen's avatar
Steven Allen committed
25 26
}

Jeromy's avatar
Jeromy committed
27 28 29 30 31 32
func closeSwarms(swarms []*Swarm) {
	for _, s := range swarms {
		s.Close()
	}
}

Steven Allen's avatar
Steven Allen committed
33
func TestBasicDialPeer(t *testing.T) {
Jeromy's avatar
Jeromy committed
34 35 36 37 38 39 40 41
	t.Parallel()
	ctx := context.Background()

	swarms := makeSwarms(ctx, t, 2)
	defer closeSwarms(swarms)
	s1 := swarms[0]
	s2 := swarms[1]

Steven Allen's avatar
Steven Allen committed
42
	s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), pstore.PermanentAddrTTL)
Jeromy's avatar
Jeromy committed
43

Steven Allen's avatar
Steven Allen committed
44
	c, err := s1.DialPeer(ctx, s2.LocalPeer())
Jeromy's avatar
Jeromy committed
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
	if err != nil {
		t.Fatal(err)
	}

	s, err := c.NewStream()
	if err != nil {
		t.Fatal(err)
	}

	s.Close()
}

func TestDialWithNoListeners(t *testing.T) {
	t.Parallel()
	ctx := context.Background()

	s1 := makeDialOnlySwarm(ctx, t)

	swarms := makeSwarms(ctx, t, 1)
	defer closeSwarms(swarms)
	s2 := swarms[0]

Steven Allen's avatar
Steven Allen committed
67
	s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), pstore.PermanentAddrTTL)
Jeromy's avatar
Jeromy committed
68

Steven Allen's avatar
Steven Allen committed
69
	c, err := s1.DialPeer(ctx, s2.LocalPeer())
Jeromy's avatar
Jeromy committed
70 71 72 73 74 75 76 77 78 79 80 81
	if err != nil {
		t.Fatal(err)
	}

	s, err := c.NewStream()
	if err != nil {
		t.Fatal(err)
	}

	s.Close()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
func acceptAndHang(l net.Listener) {
	conns := make([]net.Conn, 0, 10)
	for {
		c, err := l.Accept()
		if err != nil {
			break
		}
		if c != nil {
			conns = append(conns, c)
		}
	}
	for _, c := range conns {
		c.Close()
	}
}

func TestSimultDials(t *testing.T) {
	// t.Skip("skipping for another test")
	t.Parallel()

	ctx := context.Background()
Steven Allen's avatar
Steven Allen committed
103
	swarms := makeSwarms(ctx, t, 2, swarmt.OptDisableReuseport)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105 106 107 108 109

	// connect everyone
	{
		var wg sync.WaitGroup
		connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
			// copy for other peer
Steven Allen's avatar
Steven Allen committed
110 111 112
			log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.LocalPeer(), dst, addr)
			s.Peerstore().AddAddr(dst, addr, pstore.TempAddrTTL)
			if _, err := s.DialPeer(ctx, dst); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
				t.Fatal("error swarm dialing to peer", err)
			}
			wg.Done()
		}

		ifaceAddrs0, err := swarms[0].InterfaceListenAddresses()
		if err != nil {
			t.Fatal(err)
		}
		ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
		if err != nil {
			t.Fatal(err)
		}

		log.Info("Connecting swarms simultaneously.")
		for i := 0; i < 10; i++ { // connect 10x for each.
			wg.Add(2)
Steven Allen's avatar
Steven Allen committed
130 131
			go connect(swarms[0], swarms[1].LocalPeer(), ifaceAddrs1[0])
			go connect(swarms[1], swarms[0].LocalPeer(), ifaceAddrs0[0])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132 133 134 135 136
		}
		wg.Wait()
	}

	// should still just have 1, at most 2 connections :)
Steven Allen's avatar
Steven Allen committed
137
	c01l := len(swarms[0].ConnsToPeer(swarms[1].LocalPeer()))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138 139 140
	if c01l > 2 {
		t.Error("0->1 has", c01l)
	}
Steven Allen's avatar
Steven Allen committed
141
	c10l := len(swarms[1].ConnsToPeer(swarms[0].LocalPeer()))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
142 143 144 145 146 147 148 149 150 151 152
	if c10l > 2 {
		t.Error("1->0 has", c10l)
	}

	for _, s := range swarms {
		s.Close()
	}
}

func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
	dst := testutil.RandPeerIDFatal(t)
Matt Joiner's avatar
Matt Joiner committed
153
	lst, err := net.Listen("tcp4", ":0")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
	if err != nil {
		t.Fatal(err)
	}
	addr, err := manet.FromNetAddr(lst.Addr())
	if err != nil {
		t.Fatal(err)
	}
	addrs := []ma.Multiaddr{addr}
	addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil)
	if err != nil {
		t.Fatal(err)
	}
	t.Log("new silent peer:", dst, addrs[0])
	return dst, addrs[0], lst
}

func TestDialWait(t *testing.T) {
	t.Parallel()

	ctx := context.Background()
	swarms := makeSwarms(ctx, t, 1)
	s1 := swarms[0]
	defer s1.Close()

	// dial to a non-existent peer.
	s2p, s2addr, s2l := newSilentPeer(t)
	go acceptAndHang(s2l)
	defer s2l.Close()
Steven Allen's avatar
Steven Allen committed
182
	s1.Peerstore().AddAddr(s2p, s2addr, pstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
183 184

	before := time.Now()
Steven Allen's avatar
Steven Allen committed
185
	if c, err := s1.DialPeer(ctx, s2p); err == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186 187 188 189 190
		defer c.Close()
		t.Fatal("error swarm dialing to unknown peer worked...", err)
	} else {
		t.Log("correctly got error:", err)
	}
John Steidley's avatar
John Steidley committed
191
	duration := time.Since(before)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192

193 194
	if duration < transport.DialTimeout*DialAttempts {
		t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
195
	}
196 197
	if duration > 2*transport.DialTimeout*DialAttempts {
		t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198 199
	}

Steven Allen's avatar
Steven Allen committed
200
	if !s1.Backoff().Backoff(s2p) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
201 202 203 204 205 206 207
		t.Error("s2 should now be on backoff")
	}
}

func TestDialBackoff(t *testing.T) {
	// t.Skip("skipping for another test")
	if ci.IsRunning() {
208
		t.Skip("travis will never have fun with this test")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
	}

	t.Parallel()

	ctx := context.Background()
	swarms := makeSwarms(ctx, t, 2)
	s1 := swarms[0]
	s2 := swarms[1]
	defer s1.Close()
	defer s2.Close()

	s2addrs, err := s2.InterfaceListenAddresses()
	if err != nil {
		t.Fatal(err)
	}
Steven Allen's avatar
Steven Allen committed
224
	s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs, pstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225 226 227 228 229

	// dial to a non-existent peer.
	s3p, s3addr, s3l := newSilentPeer(t)
	go acceptAndHang(s3l)
	defer s3l.Close()
Steven Allen's avatar
Steven Allen committed
230
	s1.Peerstore().AddAddr(s3p, s3addr, pstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246

	// in this test we will:
	//   1) dial 10x to each node.
	//   2) all dials should hang
	//   3) s1->s2 should succeed.
	//   4) s1->s3 should not (and should place s3 on backoff)
	//   5) disconnect entirely
	//   6) dial 10x to each node again
	//   7) s3 dials should all return immediately (except 1)
	//   8) s2 dials should all hang, and succeed
	//   9) last s3 dial ends, unsuccessful

	dialOnlineNode := func(dst peer.ID, times int) <-chan bool {
		ch := make(chan bool)
		for i := 0; i < times; i++ {
			go func() {
Steven Allen's avatar
Steven Allen committed
247
				if _, err := s1.DialPeer(ctx, dst); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
248 249 250 251 252 253 254 255 256 257 258 259 260 261
					t.Error("error dialing", dst, err)
					ch <- false
				} else {
					ch <- true
				}
			}()
		}
		return ch
	}

	dialOfflineNode := func(dst peer.ID, times int) <-chan bool {
		ch := make(chan bool)
		for i := 0; i < times; i++ {
			go func() {
Steven Allen's avatar
Steven Allen committed
262
				if c, err := s1.DialPeer(ctx, dst); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
263 264 265 266 267 268 269 270 271 272 273 274 275 276
					ch <- false
				} else {
					t.Error("succeeded in dialing", dst)
					ch <- true
					c.Close()
				}
			}()
		}
		return ch
	}

	{
		// 1) dial 10x to each node.
		N := 10
Steven Allen's avatar
Steven Allen committed
277
		s2done := dialOnlineNode(s2.LocalPeer(), N)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
278 279 280
		s3done := dialOfflineNode(s3p, N)

		// when all dials should be done by:
281 282
		dialTimeout1x := time.After(transport.DialTimeout)
		dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336

		// 2) all dials should hang
		select {
		case <-s2done:
			t.Error("s2 should not happen immediately")
		case <-s3done:
			t.Error("s3 should not happen yet")
		case <-time.After(time.Millisecond):
			// s2 may finish very quickly, so let's get out.
		}

		// 3) s1->s2 should succeed.
		for i := 0; i < N; i++ {
			select {
			case r := <-s2done:
				if !r {
					t.Error("s2 should not fail")
				}
			case <-s3done:
				t.Error("s3 should not happen yet")
			case <-dialTimeout1x:
				t.Error("s2 took too long")
			}
		}

		select {
		case <-s2done:
			t.Error("s2 should have no more")
		case <-s3done:
			t.Error("s3 should not happen yet")
		case <-dialTimeout1x: // let it pass
		}

		// 4) s1->s3 should not (and should place s3 on backoff)
		// N-1 should finish before dialTimeout1x * 2
		for i := 0; i < N; i++ {
			select {
			case <-s2done:
				t.Error("s2 should have no more")
			case r := <-s3done:
				if r {
					t.Error("s3 should not succeed")
				}
			case <-(dialTimeout1x):
				if i < (N - 1) {
					t.Fatal("s3 took too long")
				}
				t.Log("dialTimeout1x * 1.3 hit for last peer")
			case <-dialTimeout10Ax:
				t.Fatal("s3 took too long")
			}
		}

		// check backoff state
Steven Allen's avatar
Steven Allen committed
337
		if s1.Backoff().Backoff(s2.LocalPeer()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
338 339
			t.Error("s2 should not be on backoff")
		}
Steven Allen's avatar
Steven Allen committed
340
		if !s1.Backoff().Backoff(s3p) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
341 342 343 344 345
			t.Error("s3 should be on backoff")
		}

		// 5) disconnect entirely

Steven Allen's avatar
Steven Allen committed
346
		for _, c := range s1.Conns() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
347 348
			c.Close()
		}
Steven Allen's avatar
Steven Allen committed
349
		for i := 0; i < 100 && len(s1.Conns()) > 0; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
350 351
			<-time.After(time.Millisecond)
		}
Steven Allen's avatar
Steven Allen committed
352
		if len(s1.Conns()) > 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
353 354 355 356 357 358 359
			t.Fatal("s1 conns must exit")
		}
	}

	{
		// 6) dial 10x to each node again
		N := 10
Steven Allen's avatar
Steven Allen committed
360
		s2done := dialOnlineNode(s2.LocalPeer(), N)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
361 362 363
		s3done := dialOfflineNode(s3p, N)

		// when all dials should be done by:
364 365
		dialTimeout1x := time.After(transport.DialTimeout)
		dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406

		// 7) s3 dials should all return immediately (except 1)
		for i := 0; i < N-1; i++ {
			select {
			case <-s2done:
				t.Error("s2 should not succeed yet")
			case r := <-s3done:
				if r {
					t.Error("s3 should not succeed")
				}
			case <-dialTimeout1x:
				t.Fatal("s3 took too long")
			}
		}

		// 8) s2 dials should all hang, and succeed
		for i := 0; i < N; i++ {
			select {
			case r := <-s2done:
				if !r {
					t.Error("s2 should succeed")
				}
			// case <-s3done:
			case <-(dialTimeout1x):
				t.Fatal("s3 took too long")
			}
		}

		// 9) the last s3 should return, failed.
		select {
		case <-s2done:
			t.Error("s2 should have no more")
		case r := <-s3done:
			if r {
				t.Error("s3 should not succeed")
			}
		case <-dialTimeout10Ax:
			t.Fatal("s3 took too long")
		}

		// check backoff state (the same)
Steven Allen's avatar
Steven Allen committed
407
		if s1.Backoff().Backoff(s2.LocalPeer()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
408 409
			t.Error("s2 should not be on backoff")
		}
Steven Allen's avatar
Steven Allen committed
410
		if !s1.Backoff().Backoff(s3p) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
			t.Error("s3 should be on backoff")
		}
	}
}

func TestDialBackoffClears(t *testing.T) {
	// t.Skip("skipping for another test")
	t.Parallel()

	ctx := context.Background()
	swarms := makeSwarms(ctx, t, 2)
	s1 := swarms[0]
	s2 := swarms[1]
	defer s1.Close()
	defer s2.Close()

	// use another address first, that accept and hang on conns
	_, s2bad, s2l := newSilentPeer(t)
	go acceptAndHang(s2l)
	defer s2l.Close()

	// phase 1 -- dial to non-operational addresses
Steven Allen's avatar
Steven Allen committed
433
	s1.Peerstore().AddAddr(s2.LocalPeer(), s2bad, pstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
434 435

	before := time.Now()
Steven Allen's avatar
Steven Allen committed
436
	if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
437 438 439 440 441
		t.Fatal("dialing to broken addr worked...", err)
		defer c.Close()
	} else {
		t.Log("correctly got error:", err)
	}
John Steidley's avatar
John Steidley committed
442
	duration := time.Since(before)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
443

444 445
	if duration < transport.DialTimeout*DialAttempts {
		t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
446
	}
447 448
	if duration > 2*transport.DialTimeout*DialAttempts {
		t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
449 450
	}

Steven Allen's avatar
Steven Allen committed
451
	if !s1.Backoff().Backoff(s2.LocalPeer()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
452 453 454 455 456 457 458 459 460 461
		t.Error("s2 should now be on backoff")
	} else {
		t.Log("correctly added to backoff")
	}

	// phase 2 -- add the working address. dial should succeed.
	ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
	if err != nil {
		t.Fatal(err)
	}
Steven Allen's avatar
Steven Allen committed
462
	s1.Peerstore().AddAddrs(s2.LocalPeer(), ifaceAddrs1, pstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
463

Steven Allen's avatar
Steven Allen committed
464
	if _, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil {
Jeromy's avatar
Jeromy committed
465 466 467
		t.Fatal("should have failed to dial backed off peer")
	}

Steven Allen's avatar
Steven Allen committed
468
	time.Sleep(BackoffBase)
Jeromy's avatar
Jeromy committed
469

Steven Allen's avatar
Steven Allen committed
470
	if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
471 472 473 474 475 476
		t.Fatal(err)
	} else {
		c.Close()
		t.Log("correctly connected")
	}

Steven Allen's avatar
Steven Allen committed
477
	if s1.Backoff().Backoff(s2.LocalPeer()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
478 479 480 481 482
		t.Error("s2 should no longer be on backoff")
	} else {
		t.Log("correctly cleared backoff")
	}
}
tg's avatar
tg committed
483 484 485 486 487 488 489 490 491

func TestDialPeerFailed(t *testing.T) {
	t.Parallel()
	ctx := context.Background()

	swarms := makeSwarms(ctx, t, 2)
	defer closeSwarms(swarms)
	testedSwarm, targetSwarm := swarms[0], swarms[1]

492 493
	expectedErrorsCount := 5
	for i := 0; i < expectedErrorsCount; i++ {
tg's avatar
tg committed
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
		_, silentPeerAddress, silentPeerListener := newSilentPeer(t)
		go acceptAndHang(silentPeerListener)
		defer silentPeerListener.Close()

		testedSwarm.Peerstore().AddAddr(
			targetSwarm.LocalPeer(),
			silentPeerAddress,
			pstore.PermanentAddrTTL)
	}

	_, err := testedSwarm.DialPeer(ctx, targetSwarm.LocalPeer())
	if err == nil {
		t.Fatal(err)
	}

509 510 511
	// dial_test.go:508: correctly get a combined error: failed to dial PEER: all dials failed
	//   * [/ip4/127.0.0.1/tcp/46485] failed to negotiate security protocol: context deadline exceeded
	//   * [/ip4/127.0.0.1/tcp/34881] failed to negotiate security protocol: context deadline exceeded
tg's avatar
tg committed
512 513
	// ...

514 515 516
	dialErr, ok := err.(*DialError)
	if !ok {
		t.Fatalf("expected *DialError, got %T", err)
tg's avatar
tg committed
517 518
	}

519 520
	if len(dialErr.DialErrors) != expectedErrorsCount {
		t.Errorf("expected %d errors, got %d", expectedErrorsCount, len(dialErr.DialErrors))
tg's avatar
tg committed
521 522
	}
}