dial_test.go 11.1 KB
Newer Older
Steven Allen's avatar
Steven Allen committed
1
package swarm_test
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6 7 8 9
	"net"
	"sync"
	"testing"
	"time"

Jeromy's avatar
Jeromy committed
10 11 12
	addrutil "github.com/libp2p/go-addr-util"
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
Steven Allen's avatar
Steven Allen committed
13
	swarmt "github.com/libp2p/go-libp2p-swarm/testing"
14
	transport "github.com/libp2p/go-libp2p-transport"
Jeromy's avatar
Jeromy committed
15 16 17 18
	testutil "github.com/libp2p/go-testutil"
	ci "github.com/libp2p/go-testutil/ci"
	ma "github.com/multiformats/go-multiaddr"
	manet "github.com/multiformats/go-multiaddr-net"
Steven Allen's avatar
Steven Allen committed
19 20

	. "github.com/libp2p/go-libp2p-swarm"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21 22
)

Steven Allen's avatar
Steven Allen committed
23
func init() {
24
	transport.DialTimeout = time.Second
Steven Allen's avatar
Steven Allen committed
25 26
}

Jeromy's avatar
Jeromy committed
27 28 29 30 31 32
func closeSwarms(swarms []*Swarm) {
	for _, s := range swarms {
		s.Close()
	}
}

Steven Allen's avatar
Steven Allen committed
33
func TestBasicDialPeer(t *testing.T) {
Jeromy's avatar
Jeromy committed
34 35 36 37 38 39 40 41
	t.Parallel()
	ctx := context.Background()

	swarms := makeSwarms(ctx, t, 2)
	defer closeSwarms(swarms)
	s1 := swarms[0]
	s2 := swarms[1]

Steven Allen's avatar
Steven Allen committed
42
	s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), pstore.PermanentAddrTTL)
Jeromy's avatar
Jeromy committed
43

Steven Allen's avatar
Steven Allen committed
44
	c, err := s1.DialPeer(ctx, s2.LocalPeer())
Jeromy's avatar
Jeromy committed
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
	if err != nil {
		t.Fatal(err)
	}

	s, err := c.NewStream()
	if err != nil {
		t.Fatal(err)
	}

	s.Close()
}

func TestDialWithNoListeners(t *testing.T) {
	t.Parallel()
	ctx := context.Background()

	s1 := makeDialOnlySwarm(ctx, t)

	swarms := makeSwarms(ctx, t, 1)
	defer closeSwarms(swarms)
	s2 := swarms[0]

Steven Allen's avatar
Steven Allen committed
67
	s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), pstore.PermanentAddrTTL)
Jeromy's avatar
Jeromy committed
68

Steven Allen's avatar
Steven Allen committed
69
	c, err := s1.DialPeer(ctx, s2.LocalPeer())
Jeromy's avatar
Jeromy committed
70 71 72 73 74 75 76 77 78 79 80 81
	if err != nil {
		t.Fatal(err)
	}

	s, err := c.NewStream()
	if err != nil {
		t.Fatal(err)
	}

	s.Close()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
func acceptAndHang(l net.Listener) {
	conns := make([]net.Conn, 0, 10)
	for {
		c, err := l.Accept()
		if err != nil {
			break
		}
		if c != nil {
			conns = append(conns, c)
		}
	}
	for _, c := range conns {
		c.Close()
	}
}

func TestSimultDials(t *testing.T) {
	// t.Skip("skipping for another test")
	t.Parallel()

	ctx := context.Background()
Steven Allen's avatar
Steven Allen committed
103
	swarms := makeSwarms(ctx, t, 2, swarmt.OptDisableReuseport)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105 106 107 108 109

	// connect everyone
	{
		var wg sync.WaitGroup
		connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
			// copy for other peer
Steven Allen's avatar
Steven Allen committed
110 111 112
			log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.LocalPeer(), dst, addr)
			s.Peerstore().AddAddr(dst, addr, pstore.TempAddrTTL)
			if _, err := s.DialPeer(ctx, dst); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
				t.Fatal("error swarm dialing to peer", err)
			}
			wg.Done()
		}

		ifaceAddrs0, err := swarms[0].InterfaceListenAddresses()
		if err != nil {
			t.Fatal(err)
		}
		ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
		if err != nil {
			t.Fatal(err)
		}

		log.Info("Connecting swarms simultaneously.")
		for i := 0; i < 10; i++ { // connect 10x for each.
			wg.Add(2)
Steven Allen's avatar
Steven Allen committed
130 131
			go connect(swarms[0], swarms[1].LocalPeer(), ifaceAddrs1[0])
			go connect(swarms[1], swarms[0].LocalPeer(), ifaceAddrs0[0])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132 133 134 135 136
		}
		wg.Wait()
	}

	// should still just have 1, at most 2 connections :)
Steven Allen's avatar
Steven Allen committed
137
	c01l := len(swarms[0].ConnsToPeer(swarms[1].LocalPeer()))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138 139 140
	if c01l > 2 {
		t.Error("0->1 has", c01l)
	}
Steven Allen's avatar
Steven Allen committed
141
	c10l := len(swarms[1].ConnsToPeer(swarms[0].LocalPeer()))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
	if c10l > 2 {
		t.Error("1->0 has", c10l)
	}

	for _, s := range swarms {
		s.Close()
	}
}

func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
	dst := testutil.RandPeerIDFatal(t)
	lst, err := net.Listen("tcp", ":0")
	if err != nil {
		t.Fatal(err)
	}
	addr, err := manet.FromNetAddr(lst.Addr())
	if err != nil {
		t.Fatal(err)
	}
	addrs := []ma.Multiaddr{addr}
	addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil)
	if err != nil {
		t.Fatal(err)
	}
	t.Log("new silent peer:", dst, addrs[0])
	return dst, addrs[0], lst
}

func TestDialWait(t *testing.T) {
	// t.Skip("skipping for another test")
	t.Parallel()

	ctx := context.Background()
	swarms := makeSwarms(ctx, t, 1)
	s1 := swarms[0]
	defer s1.Close()

	// dial to a non-existent peer.
	s2p, s2addr, s2l := newSilentPeer(t)
	go acceptAndHang(s2l)
	defer s2l.Close()
Steven Allen's avatar
Steven Allen committed
183
	s1.Peerstore().AddAddr(s2p, s2addr, pstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
184 185

	before := time.Now()
Steven Allen's avatar
Steven Allen committed
186
	if c, err := s1.DialPeer(ctx, s2p); err == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187 188 189 190 191
		defer c.Close()
		t.Fatal("error swarm dialing to unknown peer worked...", err)
	} else {
		t.Log("correctly got error:", err)
	}
John Steidley's avatar
John Steidley committed
192
	duration := time.Since(before)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193

194 195
	if duration < transport.DialTimeout*DialAttempts {
		t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
196
	}
197 198
	if duration > 2*transport.DialTimeout*DialAttempts {
		t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
199 200
	}

Steven Allen's avatar
Steven Allen committed
201
	if !s1.Backoff().Backoff(s2p) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202 203 204 205 206 207 208
		t.Error("s2 should now be on backoff")
	}
}

func TestDialBackoff(t *testing.T) {
	// t.Skip("skipping for another test")
	if ci.IsRunning() {
209
		t.Skip("travis will never have fun with this test")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
	}

	t.Parallel()

	ctx := context.Background()
	swarms := makeSwarms(ctx, t, 2)
	s1 := swarms[0]
	s2 := swarms[1]
	defer s1.Close()
	defer s2.Close()

	s2addrs, err := s2.InterfaceListenAddresses()
	if err != nil {
		t.Fatal(err)
	}
Steven Allen's avatar
Steven Allen committed
225
	s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs, pstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226 227 228 229 230

	// dial to a non-existent peer.
	s3p, s3addr, s3l := newSilentPeer(t)
	go acceptAndHang(s3l)
	defer s3l.Close()
Steven Allen's avatar
Steven Allen committed
231
	s1.Peerstore().AddAddr(s3p, s3addr, pstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247

	// in this test we will:
	//   1) dial 10x to each node.
	//   2) all dials should hang
	//   3) s1->s2 should succeed.
	//   4) s1->s3 should not (and should place s3 on backoff)
	//   5) disconnect entirely
	//   6) dial 10x to each node again
	//   7) s3 dials should all return immediately (except 1)
	//   8) s2 dials should all hang, and succeed
	//   9) last s3 dial ends, unsuccessful

	dialOnlineNode := func(dst peer.ID, times int) <-chan bool {
		ch := make(chan bool)
		for i := 0; i < times; i++ {
			go func() {
Steven Allen's avatar
Steven Allen committed
248
				if _, err := s1.DialPeer(ctx, dst); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
249 250 251 252 253 254 255 256 257 258 259 260 261 262
					t.Error("error dialing", dst, err)
					ch <- false
				} else {
					ch <- true
				}
			}()
		}
		return ch
	}

	dialOfflineNode := func(dst peer.ID, times int) <-chan bool {
		ch := make(chan bool)
		for i := 0; i < times; i++ {
			go func() {
Steven Allen's avatar
Steven Allen committed
263
				if c, err := s1.DialPeer(ctx, dst); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
264 265 266 267 268 269 270 271 272 273 274 275 276 277
					ch <- false
				} else {
					t.Error("succeeded in dialing", dst)
					ch <- true
					c.Close()
				}
			}()
		}
		return ch
	}

	{
		// 1) dial 10x to each node.
		N := 10
Steven Allen's avatar
Steven Allen committed
278
		s2done := dialOnlineNode(s2.LocalPeer(), N)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
279 280 281
		s3done := dialOfflineNode(s3p, N)

		// when all dials should be done by:
282 283
		dialTimeout1x := time.After(transport.DialTimeout)
		dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337

		// 2) all dials should hang
		select {
		case <-s2done:
			t.Error("s2 should not happen immediately")
		case <-s3done:
			t.Error("s3 should not happen yet")
		case <-time.After(time.Millisecond):
			// s2 may finish very quickly, so let's get out.
		}

		// 3) s1->s2 should succeed.
		for i := 0; i < N; i++ {
			select {
			case r := <-s2done:
				if !r {
					t.Error("s2 should not fail")
				}
			case <-s3done:
				t.Error("s3 should not happen yet")
			case <-dialTimeout1x:
				t.Error("s2 took too long")
			}
		}

		select {
		case <-s2done:
			t.Error("s2 should have no more")
		case <-s3done:
			t.Error("s3 should not happen yet")
		case <-dialTimeout1x: // let it pass
		}

		// 4) s1->s3 should not (and should place s3 on backoff)
		// N-1 should finish before dialTimeout1x * 2
		for i := 0; i < N; i++ {
			select {
			case <-s2done:
				t.Error("s2 should have no more")
			case r := <-s3done:
				if r {
					t.Error("s3 should not succeed")
				}
			case <-(dialTimeout1x):
				if i < (N - 1) {
					t.Fatal("s3 took too long")
				}
				t.Log("dialTimeout1x * 1.3 hit for last peer")
			case <-dialTimeout10Ax:
				t.Fatal("s3 took too long")
			}
		}

		// check backoff state
Steven Allen's avatar
Steven Allen committed
338
		if s1.Backoff().Backoff(s2.LocalPeer()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
339 340
			t.Error("s2 should not be on backoff")
		}
Steven Allen's avatar
Steven Allen committed
341
		if !s1.Backoff().Backoff(s3p) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
342 343 344 345 346
			t.Error("s3 should be on backoff")
		}

		// 5) disconnect entirely

Steven Allen's avatar
Steven Allen committed
347
		for _, c := range s1.Conns() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
348 349
			c.Close()
		}
Steven Allen's avatar
Steven Allen committed
350
		for i := 0; i < 100 && len(s1.Conns()) > 0; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
351 352
			<-time.After(time.Millisecond)
		}
Steven Allen's avatar
Steven Allen committed
353
		if len(s1.Conns()) > 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
354 355 356 357 358 359 360
			t.Fatal("s1 conns must exit")
		}
	}

	{
		// 6) dial 10x to each node again
		N := 10
Steven Allen's avatar
Steven Allen committed
361
		s2done := dialOnlineNode(s2.LocalPeer(), N)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
362 363 364
		s3done := dialOfflineNode(s3p, N)

		// when all dials should be done by:
365 366
		dialTimeout1x := time.After(transport.DialTimeout)
		dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407

		// 7) s3 dials should all return immediately (except 1)
		for i := 0; i < N-1; i++ {
			select {
			case <-s2done:
				t.Error("s2 should not succeed yet")
			case r := <-s3done:
				if r {
					t.Error("s3 should not succeed")
				}
			case <-dialTimeout1x:
				t.Fatal("s3 took too long")
			}
		}

		// 8) s2 dials should all hang, and succeed
		for i := 0; i < N; i++ {
			select {
			case r := <-s2done:
				if !r {
					t.Error("s2 should succeed")
				}
			// case <-s3done:
			case <-(dialTimeout1x):
				t.Fatal("s3 took too long")
			}
		}

		// 9) the last s3 should return, failed.
		select {
		case <-s2done:
			t.Error("s2 should have no more")
		case r := <-s3done:
			if r {
				t.Error("s3 should not succeed")
			}
		case <-dialTimeout10Ax:
			t.Fatal("s3 took too long")
		}

		// check backoff state (the same)
Steven Allen's avatar
Steven Allen committed
408
		if s1.Backoff().Backoff(s2.LocalPeer()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
409 410
			t.Error("s2 should not be on backoff")
		}
Steven Allen's avatar
Steven Allen committed
411
		if !s1.Backoff().Backoff(s3p) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
			t.Error("s3 should be on backoff")
		}
	}
}

func TestDialBackoffClears(t *testing.T) {
	// t.Skip("skipping for another test")
	t.Parallel()

	ctx := context.Background()
	swarms := makeSwarms(ctx, t, 2)
	s1 := swarms[0]
	s2 := swarms[1]
	defer s1.Close()
	defer s2.Close()

	// use another address first, that accept and hang on conns
	_, s2bad, s2l := newSilentPeer(t)
	go acceptAndHang(s2l)
	defer s2l.Close()

	// phase 1 -- dial to non-operational addresses
Steven Allen's avatar
Steven Allen committed
434
	s1.Peerstore().AddAddr(s2.LocalPeer(), s2bad, pstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
435 436

	before := time.Now()
Steven Allen's avatar
Steven Allen committed
437
	if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
438 439 440 441 442
		t.Fatal("dialing to broken addr worked...", err)
		defer c.Close()
	} else {
		t.Log("correctly got error:", err)
	}
John Steidley's avatar
John Steidley committed
443
	duration := time.Since(before)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
444

445 446
	if duration < transport.DialTimeout*DialAttempts {
		t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
447
	}
448 449
	if duration > 2*transport.DialTimeout*DialAttempts {
		t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
450 451
	}

Steven Allen's avatar
Steven Allen committed
452
	if !s1.Backoff().Backoff(s2.LocalPeer()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
453 454 455 456 457 458 459 460 461 462
		t.Error("s2 should now be on backoff")
	} else {
		t.Log("correctly added to backoff")
	}

	// phase 2 -- add the working address. dial should succeed.
	ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
	if err != nil {
		t.Fatal(err)
	}
Steven Allen's avatar
Steven Allen committed
463
	s1.Peerstore().AddAddrs(s2.LocalPeer(), ifaceAddrs1, pstore.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
464

Steven Allen's avatar
Steven Allen committed
465
	if _, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil {
Jeromy's avatar
Jeromy committed
466 467 468
		t.Fatal("should have failed to dial backed off peer")
	}

Steven Allen's avatar
Steven Allen committed
469
	time.Sleep(BackoffBase)
Jeromy's avatar
Jeromy committed
470

Steven Allen's avatar
Steven Allen committed
471
	if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
472 473 474 475 476 477
		t.Fatal(err)
	} else {
		c.Close()
		t.Log("correctly connected")
	}

Steven Allen's avatar
Steven Allen committed
478
	if s1.Backoff().Backoff(s2.LocalPeer()) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
479 480 481 482 483
		t.Error("s2 should no longer be on backoff")
	} else {
		t.Log("correctly cleared backoff")
	}
}