dial_test.go 12.8 KB
Newer Older
Steven Allen's avatar
Steven Allen committed
1
package swarm_test
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6 7 8 9
	"net"
	"sync"
	"testing"
	"time"

Jeromy's avatar
Jeromy committed
10
	addrutil "github.com/libp2p/go-addr-util"
11 12 13 14 15 16

	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/transport"

	testutil "github.com/libp2p/go-libp2p-core/test"
Steven Allen's avatar
Steven Allen committed
17
	swarmt "github.com/libp2p/go-libp2p-swarm/testing"
18 19
	"github.com/libp2p/go-libp2p-testing/ci"

Jeromy's avatar
Jeromy committed
20
	ma "github.com/multiformats/go-multiaddr"
21
	manet "github.com/multiformats/go-multiaddr/net"
Steven Allen's avatar
Steven Allen committed
22 23

	. "github.com/libp2p/go-libp2p-swarm"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
)

Steven Allen's avatar
Steven Allen committed
26
func init() {
27
	transport.DialTimeout = time.Second
Steven Allen's avatar
Steven Allen committed
28 29
}

Jeromy's avatar
Jeromy committed
30 31 32 33 34 35
func closeSwarms(swarms []*Swarm) {
	for _, s := range swarms {
		s.Close()
	}
}

Steven Allen's avatar
Steven Allen committed
36
func TestBasicDialPeer(t *testing.T) {
Jeromy's avatar
Jeromy committed
37 38 39 40 41 42 43 44
	t.Parallel()
	ctx := context.Background()

	swarms := makeSwarms(ctx, t, 2)
	defer closeSwarms(swarms)
	s1 := swarms[0]
	s2 := swarms[1]

45
	s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.PermanentAddrTTL)
Jeromy's avatar
Jeromy committed
46

Steven Allen's avatar
Steven Allen committed
47
	c, err := s1.DialPeer(ctx, s2.LocalPeer())
Jeromy's avatar
Jeromy committed
48 49 50 51
	if err != nil {
		t.Fatal(err)
	}

52
	s, err := c.NewStream(ctx)
Jeromy's avatar
Jeromy committed
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
	if err != nil {
		t.Fatal(err)
	}

	s.Close()
}

func TestDialWithNoListeners(t *testing.T) {
	t.Parallel()
	ctx := context.Background()

	s1 := makeDialOnlySwarm(ctx, t)

	swarms := makeSwarms(ctx, t, 1)
	defer closeSwarms(swarms)
	s2 := swarms[0]

70
	s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.PermanentAddrTTL)
Jeromy's avatar
Jeromy committed
71

Steven Allen's avatar
Steven Allen committed
72
	c, err := s1.DialPeer(ctx, s2.LocalPeer())
Jeromy's avatar
Jeromy committed
73 74 75 76
	if err != nil {
		t.Fatal(err)
	}

77
	s, err := c.NewStream(ctx)
Jeromy's avatar
Jeromy committed
78 79 80 81 82 83 84
	if err != nil {
		t.Fatal(err)
	}

	s.Close()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
func acceptAndHang(l net.Listener) {
	conns := make([]net.Conn, 0, 10)
	for {
		c, err := l.Accept()
		if err != nil {
			break
		}
		if c != nil {
			conns = append(conns, c)
		}
	}
	for _, c := range conns {
		c.Close()
	}
}

func TestSimultDials(t *testing.T) {
	// t.Skip("skipping for another test")
	t.Parallel()

	ctx := context.Background()
Steven Allen's avatar
Steven Allen committed
106
	swarms := makeSwarms(ctx, t, 2, swarmt.OptDisableReuseport)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107 108 109 110 111 112

	// connect everyone
	{
		var wg sync.WaitGroup
		connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
			// copy for other peer
Steven Allen's avatar
Steven Allen committed
113
			log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.LocalPeer(), dst, addr)
114
			s.Peerstore().AddAddr(dst, addr, peerstore.TempAddrTTL)
Steven Allen's avatar
Steven Allen committed
115
			if _, err := s.DialPeer(ctx, dst); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
				t.Fatal("error swarm dialing to peer", err)
			}
			wg.Done()
		}

		ifaceAddrs0, err := swarms[0].InterfaceListenAddresses()
		if err != nil {
			t.Fatal(err)
		}
		ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
		if err != nil {
			t.Fatal(err)
		}

		log.Info("Connecting swarms simultaneously.")
		for i := 0; i < 10; i++ { // connect 10x for each.
			wg.Add(2)
Steven Allen's avatar
Steven Allen committed
133 134
			go connect(swarms[0], swarms[1].LocalPeer(), ifaceAddrs1[0])
			go connect(swarms[1], swarms[0].LocalPeer(), ifaceAddrs0[0])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135 136 137 138 139
		}
		wg.Wait()
	}

	// should still just have 1, at most 2 connections :)
Steven Allen's avatar
Steven Allen committed
140
	c01l := len(swarms[0].ConnsToPeer(swarms[1].LocalPeer()))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141 142 143
	if c01l > 2 {
		t.Error("0->1 has", c01l)
	}
Steven Allen's avatar
Steven Allen committed
144
	c10l := len(swarms[1].ConnsToPeer(swarms[0].LocalPeer()))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145 146 147 148 149 150 151 152 153 154 155
	if c10l > 2 {
		t.Error("1->0 has", c10l)
	}

	for _, s := range swarms {
		s.Close()
	}
}

func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
	dst := testutil.RandPeerIDFatal(t)
156
	lst, err := net.Listen("tcp4", "localhost:0")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
	if err != nil {
		t.Fatal(err)
	}
	addr, err := manet.FromNetAddr(lst.Addr())
	if err != nil {
		t.Fatal(err)
	}
	addrs := []ma.Multiaddr{addr}
	addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil)
	if err != nil {
		t.Fatal(err)
	}
	t.Log("new silent peer:", dst, addrs[0])
	return dst, addrs[0], lst
}

func TestDialWait(t *testing.T) {
	t.Parallel()

	ctx := context.Background()
	swarms := makeSwarms(ctx, t, 1)
	s1 := swarms[0]
	defer s1.Close()

	// dial to a non-existent peer.
	s2p, s2addr, s2l := newSilentPeer(t)
	go acceptAndHang(s2l)
	defer s2l.Close()
185
	s1.Peerstore().AddAddr(s2p, s2addr, peerstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186 187

	before := time.Now()
Steven Allen's avatar
Steven Allen committed
188
	if c, err := s1.DialPeer(ctx, s2p); err == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
189 190 191 192 193
		defer c.Close()
		t.Fatal("error swarm dialing to unknown peer worked...", err)
	} else {
		t.Log("correctly got error:", err)
	}
John Steidley's avatar
John Steidley committed
194
	duration := time.Since(before)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
195

196 197
	if duration < transport.DialTimeout*DialAttempts {
		t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198
	}
199 200
	if duration > 2*transport.DialTimeout*DialAttempts {
		t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
201 202
	}

Will Scott's avatar
Will Scott committed
203
	if !s1.Backoff().Backoff(s2p, s2addr) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204 205 206 207 208 209 210
		t.Error("s2 should now be on backoff")
	}
}

func TestDialBackoff(t *testing.T) {
	// t.Skip("skipping for another test")
	if ci.IsRunning() {
211
		t.Skip("travis will never have fun with this test")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
	}

	t.Parallel()

	ctx := context.Background()
	swarms := makeSwarms(ctx, t, 2)
	s1 := swarms[0]
	s2 := swarms[1]
	defer s1.Close()
	defer s2.Close()

	s2addrs, err := s2.InterfaceListenAddresses()
	if err != nil {
		t.Fatal(err)
	}
227
	s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs, peerstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228 229 230 231 232

	// dial to a non-existent peer.
	s3p, s3addr, s3l := newSilentPeer(t)
	go acceptAndHang(s3l)
	defer s3l.Close()
233
	s1.Peerstore().AddAddr(s3p, s3addr, peerstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249

	// in this test we will:
	//   1) dial 10x to each node.
	//   2) all dials should hang
	//   3) s1->s2 should succeed.
	//   4) s1->s3 should not (and should place s3 on backoff)
	//   5) disconnect entirely
	//   6) dial 10x to each node again
	//   7) s3 dials should all return immediately (except 1)
	//   8) s2 dials should all hang, and succeed
	//   9) last s3 dial ends, unsuccessful

	dialOnlineNode := func(dst peer.ID, times int) <-chan bool {
		ch := make(chan bool)
		for i := 0; i < times; i++ {
			go func() {
Steven Allen's avatar
Steven Allen committed
250
				if _, err := s1.DialPeer(ctx, dst); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
251 252 253 254 255 256 257 258 259 260 261 262 263 264
					t.Error("error dialing", dst, err)
					ch <- false
				} else {
					ch <- true
				}
			}()
		}
		return ch
	}

	dialOfflineNode := func(dst peer.ID, times int) <-chan bool {
		ch := make(chan bool)
		for i := 0; i < times; i++ {
			go func() {
Steven Allen's avatar
Steven Allen committed
265
				if c, err := s1.DialPeer(ctx, dst); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
266 267 268 269 270 271 272 273 274 275 276 277 278 279
					ch <- false
				} else {
					t.Error("succeeded in dialing", dst)
					ch <- true
					c.Close()
				}
			}()
		}
		return ch
	}

	{
		// 1) dial 10x to each node.
		N := 10
Steven Allen's avatar
Steven Allen committed
280
		s2done := dialOnlineNode(s2.LocalPeer(), N)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
281 282 283
		s3done := dialOfflineNode(s3p, N)

		// when all dials should be done by:
284 285
		dialTimeout1x := time.After(transport.DialTimeout)
		dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339

		// 2) all dials should hang
		select {
		case <-s2done:
			t.Error("s2 should not happen immediately")
		case <-s3done:
			t.Error("s3 should not happen yet")
		case <-time.After(time.Millisecond):
			// s2 may finish very quickly, so let's get out.
		}

		// 3) s1->s2 should succeed.
		for i := 0; i < N; i++ {
			select {
			case r := <-s2done:
				if !r {
					t.Error("s2 should not fail")
				}
			case <-s3done:
				t.Error("s3 should not happen yet")
			case <-dialTimeout1x:
				t.Error("s2 took too long")
			}
		}

		select {
		case <-s2done:
			t.Error("s2 should have no more")
		case <-s3done:
			t.Error("s3 should not happen yet")
		case <-dialTimeout1x: // let it pass
		}

		// 4) s1->s3 should not (and should place s3 on backoff)
		// N-1 should finish before dialTimeout1x * 2
		for i := 0; i < N; i++ {
			select {
			case <-s2done:
				t.Error("s2 should have no more")
			case r := <-s3done:
				if r {
					t.Error("s3 should not succeed")
				}
			case <-(dialTimeout1x):
				if i < (N - 1) {
					t.Fatal("s3 took too long")
				}
				t.Log("dialTimeout1x * 1.3 hit for last peer")
			case <-dialTimeout10Ax:
				t.Fatal("s3 took too long")
			}
		}

		// check backoff state
Will Scott's avatar
Will Scott committed
340
		if s1.Backoff().Backoff(s2.LocalPeer(), s2addrs[0]) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
341 342
			t.Error("s2 should not be on backoff")
		}
Will Scott's avatar
Will Scott committed
343
		if !s1.Backoff().Backoff(s3p, s3addr) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
344 345 346 347 348
			t.Error("s3 should be on backoff")
		}

		// 5) disconnect entirely

Steven Allen's avatar
Steven Allen committed
349
		for _, c := range s1.Conns() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
350 351
			c.Close()
		}
Steven Allen's avatar
Steven Allen committed
352
		for i := 0; i < 100 && len(s1.Conns()) > 0; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
353 354
			<-time.After(time.Millisecond)
		}
Steven Allen's avatar
Steven Allen committed
355
		if len(s1.Conns()) > 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
356 357 358 359 360 361 362
			t.Fatal("s1 conns must exit")
		}
	}

	{
		// 6) dial 10x to each node again
		N := 10
Steven Allen's avatar
Steven Allen committed
363
		s2done := dialOnlineNode(s2.LocalPeer(), N)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
364 365 366
		s3done := dialOfflineNode(s3p, N)

		// when all dials should be done by:
367 368
		dialTimeout1x := time.After(transport.DialTimeout)
		dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409

		// 7) s3 dials should all return immediately (except 1)
		for i := 0; i < N-1; i++ {
			select {
			case <-s2done:
				t.Error("s2 should not succeed yet")
			case r := <-s3done:
				if r {
					t.Error("s3 should not succeed")
				}
			case <-dialTimeout1x:
				t.Fatal("s3 took too long")
			}
		}

		// 8) s2 dials should all hang, and succeed
		for i := 0; i < N; i++ {
			select {
			case r := <-s2done:
				if !r {
					t.Error("s2 should succeed")
				}
			// case <-s3done:
			case <-(dialTimeout1x):
				t.Fatal("s3 took too long")
			}
		}

		// 9) the last s3 should return, failed.
		select {
		case <-s2done:
			t.Error("s2 should have no more")
		case r := <-s3done:
			if r {
				t.Error("s3 should not succeed")
			}
		case <-dialTimeout10Ax:
			t.Fatal("s3 took too long")
		}

		// check backoff state (the same)
Will Scott's avatar
Will Scott committed
410
		if s1.Backoff().Backoff(s2.LocalPeer(), s2addrs[0]) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
411 412
			t.Error("s2 should not be on backoff")
		}
Will Scott's avatar
Will Scott committed
413
		if !s1.Backoff().Backoff(s3p, s3addr) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
			t.Error("s3 should be on backoff")
		}
	}
}

func TestDialBackoffClears(t *testing.T) {
	// t.Skip("skipping for another test")
	t.Parallel()

	ctx := context.Background()
	swarms := makeSwarms(ctx, t, 2)
	s1 := swarms[0]
	s2 := swarms[1]
	defer s1.Close()
	defer s2.Close()

	// use another address first, that accept and hang on conns
	_, s2bad, s2l := newSilentPeer(t)
	go acceptAndHang(s2l)
	defer s2l.Close()

	// phase 1 -- dial to non-operational addresses
436
	s1.Peerstore().AddAddr(s2.LocalPeer(), s2bad, peerstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
437 438

	before := time.Now()
Steven Allen's avatar
Steven Allen committed
439
	if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
440 441 442 443 444
		t.Fatal("dialing to broken addr worked...", err)
		defer c.Close()
	} else {
		t.Log("correctly got error:", err)
	}
John Steidley's avatar
John Steidley committed
445
	duration := time.Since(before)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
446

447 448
	if duration < transport.DialTimeout*DialAttempts {
		t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
449
	}
450 451
	if duration > 2*transport.DialTimeout*DialAttempts {
		t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
452 453
	}

Will Scott's avatar
Will Scott committed
454
	if !s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
455 456 457 458 459 460 461 462 463 464
		t.Error("s2 should now be on backoff")
	} else {
		t.Log("correctly added to backoff")
	}

	// phase 2 -- add the working address. dial should succeed.
	ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
	if err != nil {
		t.Fatal(err)
	}
465
	s1.Peerstore().AddAddrs(s2.LocalPeer(), ifaceAddrs1, peerstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
466

Will Scott's avatar
Will Scott committed
467 468 469
	if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil {
		c.Close()
		t.Log("backoffs are per address, not peer")
Jeromy's avatar
Jeromy committed
470 471
	}

Steven Allen's avatar
Steven Allen committed
472
	time.Sleep(BackoffBase)
Jeromy's avatar
Jeromy committed
473

Steven Allen's avatar
Steven Allen committed
474
	if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
475 476 477 478 479 480
		t.Fatal(err)
	} else {
		c.Close()
		t.Log("correctly connected")
	}

Will Scott's avatar
Will Scott committed
481
	if s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
482 483 484 485 486
		t.Error("s2 should no longer be on backoff")
	} else {
		t.Log("correctly cleared backoff")
	}
}
tg's avatar
tg committed
487 488 489 490 491 492 493 494 495

func TestDialPeerFailed(t *testing.T) {
	t.Parallel()
	ctx := context.Background()

	swarms := makeSwarms(ctx, t, 2)
	defer closeSwarms(swarms)
	testedSwarm, targetSwarm := swarms[0], swarms[1]

496 497
	expectedErrorsCount := 5
	for i := 0; i < expectedErrorsCount; i++ {
tg's avatar
tg committed
498 499 500 501 502 503 504
		_, silentPeerAddress, silentPeerListener := newSilentPeer(t)
		go acceptAndHang(silentPeerListener)
		defer silentPeerListener.Close()

		testedSwarm.Peerstore().AddAddr(
			targetSwarm.LocalPeer(),
			silentPeerAddress,
505
			peerstore.PermanentAddrTTL)
tg's avatar
tg committed
506 507 508 509 510 511 512
	}

	_, err := testedSwarm.DialPeer(ctx, targetSwarm.LocalPeer())
	if err == nil {
		t.Fatal(err)
	}

513 514 515
	// dial_test.go:508: correctly get a combined error: failed to dial PEER: all dials failed
	//   * [/ip4/127.0.0.1/tcp/46485] failed to negotiate security protocol: context deadline exceeded
	//   * [/ip4/127.0.0.1/tcp/34881] failed to negotiate security protocol: context deadline exceeded
tg's avatar
tg committed
516 517
	// ...

518 519 520
	dialErr, ok := err.(*DialError)
	if !ok {
		t.Fatalf("expected *DialError, got %T", err)
tg's avatar
tg committed
521 522
	}

523 524
	if len(dialErr.DialErrors) != expectedErrorsCount {
		t.Errorf("expected %d errors, got %d", expectedErrorsCount, len(dialErr.DialErrors))
tg's avatar
tg committed
525 526
	}
}
vyzo's avatar
vyzo committed
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552

func TestDialExistingConnection(t *testing.T) {
	ctx := context.Background()

	swarms := makeSwarms(ctx, t, 2)
	defer closeSwarms(swarms)
	s1 := swarms[0]
	s2 := swarms[1]

	s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.PermanentAddrTTL)

	c1, err := s1.DialPeer(ctx, s2.LocalPeer())
	if err != nil {
		t.Fatal(err)
	}

	c2, err := s1.DialPeer(ctx, s2.LocalPeer())
	if err != nil {
		t.Fatal(err)
	}

	if c1 != c2 {
		t.Fatal("expecting the same connection from both dials")
	}

}