dht_test.go 35.8 KB
Newer Older
1 2
package dht

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3
import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
8
	"math/rand"
9
	"sort"
10
	"strings"
11
	"sync"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
	"testing"
13
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14

15 16 17
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"
18
	"github.com/multiformats/go-multistream"
19 20 21 22 23 24

	"golang.org/x/xerrors"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"

25 26
	opts "github.com/libp2p/go-libp2p-kad-dht/opts"
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
27

28
	"github.com/ipfs/go-cid"
29
	u "github.com/ipfs/go-ipfs-util"
30
	kb "github.com/libp2p/go-libp2p-kbucket"
31
	"github.com/libp2p/go-libp2p-record"
Steven Allen's avatar
Steven Allen committed
32
	swarmt "github.com/libp2p/go-libp2p-swarm/testing"
33
	"github.com/libp2p/go-libp2p-testing/ci"
34
	travisci "github.com/libp2p/go-libp2p-testing/ci/travis"
Jeromy's avatar
Jeromy committed
35
	bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
36
	ma "github.com/multiformats/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37 38
)

39
var testCaseCids []cid.Cid
40 41 42 43

func init() {
	for i := 0; i < 100; i++ {
		v := fmt.Sprintf("%d -- value", i)
44 45 46

		mhv := u.Hash([]byte(v))
		testCaseCids = append(testCaseCids, cid.NewCidV0(mhv))
47 48 49
	}
}

50 51 52 53 54 55 56 57 58 59
type blankValidator struct{}

func (blankValidator) Validate(_ string, _ []byte) error        { return nil }
func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

type testValidator struct{}

func (testValidator) Select(_ string, bs [][]byte) (int, error) {
	index := -1
	for i, b := range bs {
60
		if bytes.Equal(b, []byte("newer")) {
61
			index = i
62
		} else if bytes.Equal(b, []byte("valid")) {
63 64 65 66 67 68 69 70 71 72 73
			if index == -1 {
				index = i
			}
		}
	}
	if index == -1 {
		return -1, errors.New("no rec found")
	}
	return index, nil
}
func (testValidator) Validate(_ string, b []byte) error {
74
	if bytes.Equal(b, []byte("expired")) {
75 76 77 78 79
		return errors.New("expired")
	}
	return nil
}

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
type testAtomicPutValidator struct {
	testValidator
}

// selects the entry with the 'highest' last byte
func (testAtomicPutValidator) Select(_ string, bs [][]byte) (int, error) {
	index := -1
	max := uint8(0)
	for i, b := range bs {
		if bytes.Equal(b, []byte("valid")) {
			if index == -1 {
				index = i
			}
			continue
		}

		str := string(b)
		n := str[len(str)-1]
		if n > max {
			max = n
			index = i
		}

	}
	if index == -1 {
		return -1, errors.New("no rec found")
	}
	return index, nil
}

110
func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
111 112
	d, err := New(
		ctx,
Steven Allen's avatar
Steven Allen committed
113
		bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
114 115
		opts.Client(client),
		opts.NamespacedValidator("v", blankValidator{}),
116
		opts.DisableAutoRefresh(),
117 118 119
	)
	if err != nil {
		t.Fatal(err)
120
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121 122 123
	return d
}

124
func setupDHTS(t *testing.T, ctx context.Context, n int) []*IpfsDHT {
125
	addrs := make([]ma.Multiaddr, n)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126
	dhts := make([]*IpfsDHT, n)
127 128
	peers := make([]peer.ID, n)

129 130 131
	sanityAddrsMap := make(map[string]struct{})
	sanityPeersMap := make(map[string]struct{})

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132
	for i := 0; i < n; i++ {
133
		dhts[i] = setupDHT(ctx, t, false)
134 135
		peers[i] = dhts[i].PeerID()
		addrs[i] = dhts[i].host.Addrs()[0]
136 137

		if _, lol := sanityAddrsMap[addrs[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
138
			t.Fatal("While setting up DHTs address got duplicated.")
139 140 141 142
		} else {
			sanityAddrsMap[addrs[i].String()] = struct{}{}
		}
		if _, lol := sanityPeersMap[peers[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
143
			t.Fatal("While setting up DHTs peerid got duplicated.")
144 145 146
		} else {
			sanityPeersMap[peers[i].String()] = struct{}{}
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147 148
	}

149
	return dhts
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150 151
}

152
func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
153 154
	t.Helper()

155
	idB := b.self
156
	addrB := b.peerstore.Addrs(idB)
157 158
	if len(addrB) == 0 {
		t.Fatal("peers setup incorrectly: no local address")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
	}
160

161 162
	a.peerstore.AddAddrs(idB, addrB, peerstore.TempAddrTTL)
	pi := peer.AddrInfo{ID: idB}
Jeromy's avatar
Jeromy committed
163
	if err := a.host.Connect(ctx, pi); err != nil {
164
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165
	}
166 167
}

168 169
func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
	t.Helper()
170

171 172
	// loop until connection notification has been received.
	// under high load, this may not happen as immediately as we would like.
173
	for a.routingTable.Find(b.self) == "" {
174 175 176 177 178
		select {
		case <-ctx.Done():
			t.Fatal(ctx.Err())
		case <-time.After(time.Millisecond * 5):
		}
179
	}
180
}
181

182 183 184 185 186
func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
	t.Helper()
	connectNoSync(t, ctx, a, b)
	wait(t, ctx, a, b)
	wait(t, ctx, b, a)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187 188
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
189
func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
190

191
	ctx, cancel := context.WithCancel(ctx)
Steven Allen's avatar
Steven Allen committed
192 193
	defer cancel()

194
	logger.Debugf("refreshing DHTs routing tables...")
195 196 197 198 199 200 201 202 203

	// tried async. sequential fares much better. compare:
	// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
	// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
	// probably because results compound

	start := rand.Intn(len(dhts)) // randomize to decrease bias.
	for i := range dhts {
		dht := dhts[(start+i)%len(dhts)]
204 205 206 207 208 209 210 211
		select {
		case err := <-dht.RefreshRoutingTable():
			if err != nil {
				t.Error(err)
			}
		case <-ctx.Done():
			return
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212 213 214
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
215
func TestValueGetSet(t *testing.T) {
216 217
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
218

Steven Allen's avatar
Steven Allen committed
219
	var dhts [5]*IpfsDHT
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220

Steven Allen's avatar
Steven Allen committed
221 222 223 224 225
	for i := range dhts {
		dhts[i] = setupDHT(ctx, t, false)
		defer dhts[i].Close()
		defer dhts[i].host.Close()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226

Steven Allen's avatar
Steven Allen committed
227
	connect(t, ctx, dhts[0], dhts[1])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228

Steven Allen's avatar
Steven Allen committed
229
	t.Log("adding value on: ", dhts[0].self)
Jeromy's avatar
Jeromy committed
230 231
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
Steven Allen's avatar
Steven Allen committed
232
	err := dhts[0].PutValue(ctxT, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
233 234 235 236
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
237
	t.Log("requesting value on dhts: ", dhts[1].self)
Jeromy's avatar
Jeromy committed
238 239
	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
Steven Allen's avatar
Steven Allen committed
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280

	val, err := dhts[1].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
	}

	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
	}

	// late connect

	connect(t, ctx, dhts[2], dhts[0])
	connect(t, ctx, dhts[2], dhts[1])

	t.Log("requesting value (offline) on dhts: ", dhts[2].self)
	vala, err := dhts[2].GetValue(ctxT, "/v/hello", Quorum(0))
	if vala != nil {
		t.Fatalf("offline get should have failed, got %s", string(vala))
	}
	if err != routing.ErrNotFound {
		t.Fatalf("offline get should have failed with ErrNotFound, got: %s", err)
	}

	t.Log("requesting value (online) on dhts: ", dhts[2].self)
	val, err = dhts[2].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
	}

	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
	}

	for _, d := range dhts[:3] {
		connect(t, ctx, dhts[3], d)
	}
	connect(t, ctx, dhts[4], dhts[3])

	t.Log("requesting value (requires peer routing) on dhts: ", dhts[4].self)
	val, err = dhts[4].GetValue(ctxT, "/v/hello")
281 282 283 284
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
285 286
	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
287
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
288 289
}

290 291 292 293 294 295 296 297 298 299 300 301
func TestValueSetInvalid(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

302
	dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
	dhtB.Validator.(record.NamespacedValidator)["v"] = blankValidator{}

	connect(t, ctx, dhtA, dhtB)

	testSetGet := func(val string, failset bool, exp string, experr error) {
		t.Helper()

		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
		err := dhtA.PutValue(ctxT, "/v/hello", []byte(val))
		if failset {
			if err == nil {
				t.Error("expected set to fail")
			}
		} else {
			if err != nil {
				t.Error(err)
			}
		}

		ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
		defer cancel()
		valb, err := dhtB.GetValue(ctxT, "/v/hello")
		if err != experr {
			t.Errorf("Set/Get %v: Expected %v error but got %v", val, experr, err)
		} else if err == nil && string(valb) != exp {
			t.Errorf("Expected '%v' got '%s'", exp, string(valb))
		}
	}

	// Expired records should not be set
	testSetGet("expired", true, "", routing.ErrNotFound)
	// Valid record should be returned
	testSetGet("valid", false, "valid", nil)
	// Newer record should supersede previous record
	testSetGet("newer", false, "newer", nil)
	// Attempt to set older record again should be ignored
	testSetGet("valid", true, "newer", nil)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
func TestSearchValue(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

	connect(t, ctx, dhtA, dhtB)

	dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
	dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}

	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()

	err := dhtA.PutValue(ctxT, "/v/hello", []byte("valid"))
	if err != nil {
		t.Error(err)
	}

	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
370
	valCh, err := dhtA.SearchValue(ctxT, "/v/hello", Quorum(-1))
371 372 373
	if err != nil {
		t.Fatal(err)
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398

	select {
	case v := <-valCh:
		if string(v) != "valid" {
			t.Errorf("expected 'valid', got '%s'", string(v))
		}
	case <-ctxT.Done():
		t.Fatal(ctxT.Err())
	}

	err = dhtB.PutValue(ctxT, "/v/hello", []byte("newer"))
	if err != nil {
		t.Error(err)
	}

	select {
	case v := <-valCh:
		if string(v) != "newer" {
			t.Errorf("expected 'newer', got '%s'", string(v))
		}
	case <-ctxT.Done():
		t.Fatal(ctxT.Err())
	}
}

399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
func TestGetValues(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

	connect(t, ctx, dhtA, dhtB)

	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()

	err := dhtB.PutValue(ctxT, "/v/hello", []byte("newer"))
	if err != nil {
		t.Error(err)
	}

	err = dhtA.PutValue(ctxT, "/v/hello", []byte("valid"))
	if err != nil {
		t.Error(err)
	}

	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
	vals, err := dhtA.GetValues(ctxT, "/v/hello", 16)
	if err != nil {
		t.Fatal(err)
	}

	if len(vals) != 2 {
		t.Fatalf("expected to get 2 values, got %d", len(vals))
	}

437
	sort.Slice(vals, func(i, j int) bool { return string(vals[i].Val) < string(vals[j].Val) })
438 439 440 441 442 443 444 445 446

	if string(vals[0].Val) != "valid" {
		t.Errorf("unexpected vals[0]: %s", string(vals[0].Val))
	}
	if string(vals[1].Val) != "valid" {
		t.Errorf("unexpected vals[1]: %s", string(vals[1].Val))
	}
}

447 448 449 450 451 452 453 454 455 456 457 458 459
func TestValueGetInvalid(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

	dhtA.Validator.(record.NamespacedValidator)["v"] = blankValidator{}
460
	dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}
461 462 463 464

	connect(t, ctx, dhtA, dhtB)

	testSetGet := func(val string, exp string, experr error) {
465 466
		t.Helper()

467 468 469 470
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
		err := dhtA.PutValue(ctxT, "/v/hello", []byte(val))
		if err != nil {
471
			t.Error(err)
472 473 474 475 476 477
		}

		ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
		defer cancel()
		valb, err := dhtB.GetValue(ctxT, "/v/hello")
		if err != experr {
Łukasz Magiera's avatar
Łukasz Magiera committed
478
			t.Errorf("Set/Get %v: Expected '%v' error but got '%v'", val, experr, err)
479 480
		} else if err == nil && string(valb) != exp {
			t.Errorf("Expected '%v' got '%s'", exp, string(valb))
481 482 483 484 485 486 487 488 489 490 491 492 493
		}
	}

	// Expired records should not be returned
	testSetGet("expired", "", routing.ErrNotFound)
	// Valid record should be returned
	testSetGet("valid", "valid", nil)
	// Newer record should supersede previous record
	testSetGet("newer", "newer", nil)
	// Attempt to set older record again should be ignored
	testSetGet("valid", "newer", nil)
}

494
func TestInvalidMessageSenderTracking(t *testing.T) {
Steven Allen's avatar
Steven Allen committed
495 496 497
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

498
	dht := setupDHT(ctx, t, false)
Steven Allen's avatar
Steven Allen committed
499 500
	defer dht.Close()

501
	foo := peer.ID("asdasd")
Steven Allen's avatar
Steven Allen committed
502
	_, err := dht.messageSenderForPeer(ctx, foo)
503 504 505 506 507
	if err == nil {
		t.Fatal("that shouldnt have succeeded")
	}

	dht.smlk.Lock()
Steven Allen's avatar
Steven Allen committed
508 509 510 511
	mscnt := len(dht.strmap)
	dht.smlk.Unlock()

	if mscnt > 0 {
512 513 514 515
		t.Fatal("should have no message senders in map")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
516 517
func TestProvides(t *testing.T) {
	// t.Skip("skipping test to debug another")
Steven Allen's avatar
Steven Allen committed
518 519
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
520

521
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
522 523
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
524
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
525
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
526 527 528
		}
	}()

529 530 531
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
532

533
	for _, k := range testCaseCids {
Matt Joiner's avatar
Matt Joiner committed
534
		logger.Debugf("announcing provider for %s", k)
Jeromy's avatar
Jeromy committed
535
		if err := dhts[3].Provide(ctx, k, true); err != nil {
536 537
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
538 539
	}

540 541 542 543
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)

	n := 0
544
	for _, c := range testCaseCids {
545 546
		n = (n + 1) % 3

Matt Joiner's avatar
Matt Joiner committed
547
		logger.Debugf("getting providers for %s from %d", c, n)
Jeromy's avatar
Jeromy committed
548 549
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
550
		provchan := dhts[n].FindProvidersAsync(ctxT, c, 1)
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565

		select {
		case prov := <-provchan:
			if prov.ID == "" {
				t.Fatal("Got back nil provider")
			}
			if prov.ID != dhts[3].self {
				t.Fatal("Got back wrong provider")
			}
		case <-ctxT.Done():
			t.Fatal("Did not get a provider back.")
		}
	}
}

Jeromy's avatar
Jeromy committed
566 567
func TestLocalProvides(t *testing.T) {
	// t.Skip("skipping test to debug another")
Steven Allen's avatar
Steven Allen committed
568 569
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Jeromy's avatar
Jeromy committed
570

571
	dhts := setupDHTS(t, ctx, 4)
Jeromy's avatar
Jeromy committed
572 573 574 575 576 577 578 579 580 581 582 583
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])

	for _, k := range testCaseCids {
Matt Joiner's avatar
Matt Joiner committed
584
		logger.Debugf("announcing provider for %s", k)
Jeromy's avatar
Jeromy committed
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601
		if err := dhts[3].Provide(ctx, k, false); err != nil {
			t.Fatal(err)
		}
	}

	time.Sleep(time.Millisecond * 10)

	for _, c := range testCaseCids {
		for i := 0; i < 3; i++ {
			provs := dhts[i].providers.GetProviders(ctx, c)
			if len(provs) > 0 {
				t.Fatal("shouldnt know this")
			}
		}
	}
}

602 603 604 605 606 607 608 609 610 611
// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
	// test "well-formed-ness" (>= minPeers peers in every routing table)

	checkTables := func() bool {
		totalPeers := 0
		for _, dht := range dhts {
			rtlen := dht.routingTable.Size()
			totalPeers += rtlen
			if minPeers > 0 && rtlen < minPeers {
612
				//t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628
				return false
			}
		}
		actualAvgPeers := totalPeers / len(dhts)
		t.Logf("avg rt size: %d", actualAvgPeers)
		if avgPeers > 0 && actualAvgPeers < avgPeers {
			t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers)
			return false
		}
		return true
	}

	timeoutA := time.After(timeout)
	for {
		select {
		case <-timeoutA:
Matt Joiner's avatar
Matt Joiner committed
629
			logger.Debugf("did not reach well-formed routing tables by %s", timeout)
630 631 632 633 634 635 636 637 638 639 640
			return false // failed
		case <-time.After(5 * time.Millisecond):
			if checkTables() {
				return true // succeeded
			}
		}
	}
}

func printRoutingTables(dhts []*IpfsDHT) {
	// the routing tables should be full now. let's inspect them.
641
	fmt.Printf("checking routing table of %d\n", len(dhts))
642 643 644 645 646 647 648
	for _, dht := range dhts {
		fmt.Printf("checking routing table of %s\n", dht.self)
		dht.routingTable.Print()
		fmt.Println("")
	}
}

649
func TestRefresh(t *testing.T) {
650 651 652 653
	if testing.Short() {
		t.SkipNow()
	}

Steven Allen's avatar
Steven Allen committed
654 655
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
656

657
	nDHTs := 30
658
	dhts := setupDHTS(t, ctx, nDHTs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
659 660 661
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
662
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
663 664 665 666 667 668 669 670
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

671
	<-time.After(100 * time.Millisecond)
672
	// bootstrap a few times until we get good tables.
Steven Allen's avatar
Steven Allen committed
673 674 675 676
	t.Logf("bootstrapping them so they find each other %d", nDHTs)
	ctxT, cancelT := context.WithTimeout(ctx, 5*time.Second)
	defer cancelT()

677
	go func() {
Steven Allen's avatar
Steven Allen committed
678
		for ctxT.Err() == nil {
679 680
			bootstrap(t, ctxT, dhts)

Steven Allen's avatar
Steven Allen committed
681
			// wait a bit.
682 683 684
			select {
			case <-time.After(50 * time.Millisecond):
				continue // being explicit
Steven Allen's avatar
Steven Allen committed
685
			case <-ctxT.Done():
686 687 688 689 690
				return
			}
		}
	}()

691
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
Steven Allen's avatar
Steven Allen committed
692
	cancelT()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
693

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
694 695
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
696 697 698 699
		printRoutingTables(dhts)
	}
}

700
func TestRefreshBelowMinRTThreshold(t *testing.T) {
Aarsh Shah's avatar
Aarsh Shah committed
701
	ctx := context.Background()
702 703 704 705 706 707 708 709 710 711 712 713

	// enable auto bootstrap on A
	dhtA, err := New(
		ctx,
		bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
		opts.Client(false),
		opts.NamespacedValidator("v", blankValidator{}),
	)
	if err != nil {
		t.Fatal(err)
	}

Aarsh Shah's avatar
Aarsh Shah committed
714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731
	dhtB := setupDHT(ctx, t, false)
	dhtC := setupDHT(ctx, t, false)

	defer func() {
		dhtA.Close()
		dhtA.host.Close()

		dhtB.Close()
		dhtB.host.Close()

		dhtC.Close()
		dhtC.host.Close()
	}()

	connect(t, ctx, dhtA, dhtB)
	connect(t, ctx, dhtB, dhtC)

	// we ONLY init bootstrap on A
732
	dhtA.RefreshRoutingTable()
Aarsh Shah's avatar
Aarsh Shah committed
733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759
	// and wait for one round to complete i.e. A should be connected to both B & C
	waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 2, 2, 20*time.Second)

	// now we create two new peers
	dhtD := setupDHT(ctx, t, false)
	dhtE := setupDHT(ctx, t, false)

	// connect them to each other
	connect(t, ctx, dhtD, dhtE)
	defer func() {
		dhtD.Close()
		dhtD.host.Close()

		dhtE.Close()
		dhtE.host.Close()
	}()

	// and then, on connecting the peer D to A, the min RT threshold gets triggered on A which leads to a bootstrap.
	// since the default bootstrap scan interval is 30 mins - 1 hour, we can be sure that if bootstrap happens,
	// it is because of the min RT threshold getting triggered (since default min value is 4 & we only have 2 peers in the RT when D gets connected)
	connect(t, ctx, dhtA, dhtD)

	// and because of the above bootstrap, A also discovers E !
	waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second)
	assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!")
}

760
func TestPeriodicRefresh(t *testing.T) {
761 762 763
	if ci.IsRunning() {
		t.Skip("skipping on CI. highly timing dependent")
	}
764 765 766 767
	if testing.Short() {
		t.SkipNow()
	}

Steven Allen's avatar
Steven Allen committed
768 769
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
770 771

	nDHTs := 30
772
	dhts := setupDHTS(t, ctx, nDHTs)
773 774 775 776 777 778 779
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

780
	t.Logf("dhts are not connected. %d", nDHTs)
781 782 783 784 785 786 787 788 789 790 791
	for _, dht := range dhts {
		rtlen := dht.routingTable.Size()
		if rtlen > 0 {
			t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen)
		}
	}

	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

792
	t.Logf("DHTs are now connected to 1-2 others. %d", nDHTs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
793
	for _, dht := range dhts {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
794
		rtlen := dht.routingTable.Size()
795 796
		if rtlen > 2 {
			t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
797
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
798
	}
799

800 801 802 803
	if u.Debug {
		printRoutingTables(dhts)
	}

804
	t.Logf("bootstrapping them so they find each other. %d", nDHTs)
Matt Joiner's avatar
Matt Joiner committed
805
	for _, dht := range dhts {
806
		dht.RefreshRoutingTable()
807
	}
808 809 810

	// this is async, and we dont know when it's finished with one cycle, so keep checking
	// until the routing tables look better, or some long timeout for the failure case.
811
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
812 813 814

	if u.Debug {
		printRoutingTables(dhts)
815
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
816 817
}

818 819
func TestProvidesMany(t *testing.T) {
	t.Skip("this test doesn't work")
Steven Allen's avatar
Steven Allen committed
820 821
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
822 823

	nDHTs := 40
824
	dhts := setupDHTS(t, ctx, nDHTs)
825 826 827
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
828
			defer dhts[i].host.Close()
829 830 831 832 833 834 835 836
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

837
	<-time.After(100 * time.Millisecond)
838
	t.Logf("bootstrapping them so they find each other. %d", nDHTs)
Jeromy's avatar
Jeromy committed
839 840
	ctxT, cancel := context.WithTimeout(ctx, 20*time.Second)
	defer cancel()
841 842
	bootstrap(t, ctxT, dhts)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
843 844 845 846 847 848 849 850
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
		t.Logf("checking routing table of %d", nDHTs)
		for _, dht := range dhts {
			fmt.Printf("checking routing table of %s\n", dht.self)
			dht.routingTable.Print()
			fmt.Println("")
		}
851
	}
852

853
	providers := make(map[cid.Cid]peer.ID)
854

855
	d := 0
856
	for _, c := range testCaseCids {
857 858
		d = (d + 1) % len(dhts)
		dht := dhts[d]
859
		providers[c] = dht.self
860

861
		t.Logf("announcing provider for %s", c)
Jeromy's avatar
Jeromy committed
862
		if err := dht.Provide(ctx, c, true); err != nil {
863 864
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
865 866
	}

867 868
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
869

870 871
	errchan := make(chan error)

Jeromy's avatar
Jeromy committed
872 873
	ctxT, cancel = context.WithTimeout(ctx, 5*time.Second)
	defer cancel()
874 875

	var wg sync.WaitGroup
876
	getProvider := func(dht *IpfsDHT, k cid.Cid) {
877
		defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
878

879
		expected := providers[k]
880

881 882 883
		provchan := dht.FindProvidersAsync(ctxT, k, 1)
		select {
		case prov := <-provchan:
884 885
			actual := prov.ID
			if actual == "" {
886
				errchan <- fmt.Errorf("Got back nil provider (%s at %s)", k, dht.self)
887 888 889
			} else if actual != expected {
				errchan <- fmt.Errorf("Got back wrong provider (%s != %s) (%s at %s)",
					expected, actual, k, dht.self)
890 891 892
			}
		case <-ctxT.Done():
			errchan <- fmt.Errorf("Did not get a provider back (%s at %s)", k, dht.self)
Jeromy's avatar
Jeromy committed
893
		}
894 895
	}

896
	for _, c := range testCaseCids {
897 898
		// everyone should be able to find it...
		for _, dht := range dhts {
Matt Joiner's avatar
Matt Joiner committed
899
			logger.Debugf("getting providers for %s at %s", c, dht.self)
900
			wg.Add(1)
901
			go getProvider(dht, c)
902
		}
903 904 905 906 907 908 909 910 911 912
	}

	// we need this because of printing errors
	go func() {
		wg.Wait()
		close(errchan)
	}()

	for err := range errchan {
		t.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
913 914 915
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
916
func TestProvidesAsync(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
917
	// t.Skip("skipping test to debug another")
918 919 920
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
921

Steven Allen's avatar
Steven Allen committed
922 923
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
924

925
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
926 927
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
928
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
929
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
930 931 932
		}
	}()

933 934 935
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
936

Jeromy's avatar
Jeromy committed
937
	err := dhts[3].Provide(ctx, testCaseCids[0], true)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
938 939 940 941 942 943
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(time.Millisecond * 60)

Jeromy's avatar
Jeromy committed
944 945
	ctxT, cancel := context.WithTimeout(ctx, time.Millisecond*300)
	defer cancel()
946
	provs := dhts[0].FindProvidersAsync(ctxT, testCaseCids[0], 5)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
947
	select {
Jeromy's avatar
Jeromy committed
948 949 950 951
	case p, ok := <-provs:
		if !ok {
			t.Fatal("Provider channel was closed...")
		}
952
		if p.ID == "" {
Jeromy's avatar
Jeromy committed
953 954
			t.Fatal("Got back nil provider!")
		}
955
		if p.ID != dhts[3].self {
956
			t.Fatalf("got a provider, but not the right one. %s", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
957
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
958
	case <-ctxT.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
959 960 961 962
		t.Fatal("Didnt get back providers")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
963
func TestLayeredGet(t *testing.T) {
964 965
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
966

967
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
968 969
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
970
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
971
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
972 973 974
		}
	}()

975 976
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
977
	connect(t, ctx, dhts[2], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
978

979
	err := dhts[3].PutValue(ctx, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
980 981 982 983
	if err != nil {
		t.Fatal(err)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
984
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
985

Jeromy's avatar
Jeromy committed
986 987
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
988 989 990
	val, err := dhts[0].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
991
	}
992 993 994

	if string(val) != "world" {
		t.Error("got wrong value")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
995 996 997
	}
}

998 999 1000 1001 1002 1003 1004 1005
func TestUnfindablePeer(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

1006
	dhts := setupDHTS(t, ctx, 4)
1007 1008 1009
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
1010
			dhts[i].Host().Close()
1011 1012 1013 1014 1015 1016 1017 1018
		}
	}()

	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[2], dhts[3])

	// Give DHT 1 a bad addr for DHT 2.
1019 1020
	dhts[1].host.Peerstore().ClearAddrs(dhts[2].PeerID())
	dhts[1].host.Peerstore().AddAddr(dhts[2].PeerID(), dhts[0].Host().Addrs()[0], time.Minute)
1021 1022 1023

	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
1024
	_, err := dhts[0].FindPeer(ctxT, dhts[3].PeerID())
1025 1026 1027 1028 1029 1030 1031 1032
	if err == nil {
		t.Error("should have failed to find peer")
	}
	if ctxT.Err() != nil {
		t.Error("FindPeer should have failed before context expired")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1033
func TestFindPeer(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1034
	// t.Skip("skipping test to debug another")
1035 1036 1037
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1038

Steven Allen's avatar
Steven Allen committed
1039 1040
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1041

1042
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1043 1044
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1045
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1046
			dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1047 1048 1049
		}
	}()

1050 1051 1052
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1053

Jeromy's avatar
Jeromy committed
1054 1055
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
1056
	p, err := dhts[0].FindPeer(ctxT, dhts[2].PeerID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1057 1058 1059 1060
	if err != nil {
		t.Fatal(err)
	}

1061
	if p.ID == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1062 1063 1064
		t.Fatal("Failed to find peer.")
	}

1065
	if p.ID != dhts[2].PeerID() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1066 1067 1068
		t.Fatal("Didnt find expected peer.")
	}
}
1069

1070
func TestFindPeersConnectedToPeer(t *testing.T) {
1071 1072
	t.Skip("not quite correct (see note)")

1073 1074 1075 1076
	if testing.Short() {
		t.SkipNow()
	}

Steven Allen's avatar
Steven Allen committed
1077 1078
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
1079

1080
	dhts := setupDHTS(t, ctx, 4)
1081 1082 1083
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1084
			dhts[i].host.Close()
1085 1086 1087 1088 1089
		}
	}()

	// topology:
	// 0-1, 1-2, 1-3, 2-3
1090 1091 1092 1093
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
	connect(t, ctx, dhts[2], dhts[3])
1094 1095 1096 1097 1098 1099

	// fmt.Println("0 is", peers[0])
	// fmt.Println("1 is", peers[1])
	// fmt.Println("2 is", peers[2])
	// fmt.Println("3 is", peers[3])

Jeromy's avatar
Jeromy committed
1100 1101
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
1102
	pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, dhts[2].PeerID())
1103 1104 1105 1106
	if err != nil {
		t.Fatal(err)
	}

1107
	// shouldFind := []peer.ID{peers[1], peers[3]}
1108
	var found []*peer.AddrInfo
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118
	for nextp := range pchan {
		found = append(found, nextp)
	}

	// fmt.Printf("querying 0 (%s) FindPeersConnectedToPeer 2 (%s)\n", peers[0], peers[2])
	// fmt.Println("should find 1, 3", shouldFind)
	// fmt.Println("found", found)

	// testPeerListsMatch(t, shouldFind, found)

Matt Joiner's avatar
Matt Joiner committed
1119
	logger.Warning("TestFindPeersConnectedToPeer is not quite correct")
1120 1121 1122 1123 1124
	if len(found) == 0 {
		t.Fatal("didn't find any peers.")
	}
}

1125
func TestConnectCollision(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1126
	// t.Skip("skipping test to debug another")
1127 1128 1129
	if testing.Short() {
		t.SkipNow()
	}
1130 1131 1132
	if travisci.IsRunning() {
		t.Skip("Skipping on Travis-CI.")
	}
1133

1134
	runTimes := 10
1135

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1136
	for rtime := 0; rtime < runTimes; rtime++ {
Matt Joiner's avatar
Matt Joiner committed
1137
		logger.Info("Running Time: ", rtime)
1138

Steven Allen's avatar
Steven Allen committed
1139
		ctx, cancel := context.WithCancel(context.Background())
1140

1141 1142
		dhtA := setupDHT(ctx, t, false)
		dhtB := setupDHT(ctx, t, false)
1143

1144 1145
		addrA := dhtA.peerstore.Addrs(dhtA.self)[0]
		addrB := dhtB.peerstore.Addrs(dhtB.self)[0]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1146

1147 1148
		peerA := dhtA.self
		peerB := dhtB.self
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1149

1150
		errs := make(chan error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1151
		go func() {
1152 1153
			dhtA.peerstore.AddAddr(peerB, addrB, peerstore.TempAddrTTL)
			pi := peer.AddrInfo{ID: peerB}
Jeromy's avatar
Jeromy committed
1154
			err := dhtA.host.Connect(ctx, pi)
1155
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1156 1157
		}()
		go func() {
1158 1159
			dhtB.peerstore.AddAddr(peerA, addrA, peerstore.TempAddrTTL)
			pi := peer.AddrInfo{ID: peerA}
Jeromy's avatar
Jeromy committed
1160
			err := dhtB.host.Connect(ctx, pi)
1161
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1162 1163
		}()

1164
		timeout := time.After(5 * time.Second)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1165
		select {
1166 1167 1168 1169
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1170 1171 1172 1173
		case <-timeout:
			t.Fatal("Timeout received!")
		}
		select {
1174 1175 1176 1177
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1178 1179 1180 1181
		case <-timeout:
			t.Fatal("Timeout received!")
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1182 1183
		dhtA.Close()
		dhtB.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1184 1185
		dhtA.host.Close()
		dhtB.host.Close()
Steven Allen's avatar
Steven Allen committed
1186
		cancel()
Jeromy's avatar
Jeromy committed
1187
	}
1188
}
1189 1190 1191 1192 1193

func TestBadProtoMessages(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

1194
	d := setupDHT(ctx, t, false)
1195 1196 1197 1198 1199 1200

	nilrec := new(pb.Message)
	if _, err := d.handlePutValue(ctx, "testpeer", nilrec); err == nil {
		t.Fatal("should have errored on nil record")
	}
}
1201

1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
func TestAtomicPut(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	d := setupDHT(ctx, t, false)
	d.Validator = testAtomicPutValidator{}

	// fnc to put a record
	key := "testkey"
	putRecord := func(value []byte) error {
		rec := record.MakePutRecord(key, value)
		pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
		pmes.Record = rec
		_, err := d.handlePutValue(ctx, "testpeer", pmes)
		return err
	}

	// put a valid record
	if err := putRecord([]byte("valid")); err != nil {
		t.Fatal("should not have errored on a valid record")
	}

	// simultaneous puts for old & new values
	values := [][]byte{[]byte("newer1"), []byte("newer7"), []byte("newer3"), []byte("newer5")}
	var wg sync.WaitGroup
	for _, v := range values {
		wg.Add(1)
		go func(v []byte) {
			defer wg.Done()
			putRecord(v)
		}(v)
	}
	wg.Wait()

	// get should return the newest value
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
	msg, err := d.handleGetValue(ctx, "testkey", pmes)
	if err != nil {
		t.Fatalf("should not have errored on final get, but got %+v", err)
	}
	if string(msg.GetRecord().Value) != "newer7" {
		t.Fatalf("Expected 'newer7' got '%s'", string(msg.GetRecord().Value))
	}
}

1247 1248 1249 1250 1251 1252 1253 1254 1255
func TestClientModeConnect(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	a := setupDHT(ctx, t, false)
	b := setupDHT(ctx, t, true)

	connectNoSync(t, ctx, a, b)

1256
	c := testCaseCids[0]
1257
	p := peer.ID("TestPeer")
1258 1259
	a.providers.AddProvider(ctx, c, p)
	time.Sleep(time.Millisecond * 5) // just in case...
1260

1261
	provs, err := b.FindProviders(ctx, c)
1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272
	if err != nil {
		t.Fatal(err)
	}

	if len(provs) == 0 {
		t.Fatal("Expected to get a provider back")
	}

	if provs[0].ID != p {
		t.Fatal("expected it to be our test peer")
	}
Steven Allen's avatar
Steven Allen committed
1273 1274 1275 1276 1277 1278 1279 1280 1281
	if a.routingTable.Find(b.self) != "" {
		t.Fatal("DHT clients should not be added to routing tables")
	}
	if b.routingTable.Find(a.self) == "" {
		t.Fatal("DHT server should have been added to the dht client's routing table")
	}
}

func TestClientModeFindPeer(t *testing.T) {
1282
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Steven Allen's avatar
Steven Allen committed
1283 1284 1285 1286 1287 1288
	defer cancel()

	a := setupDHT(ctx, t, false)
	b := setupDHT(ctx, t, true)
	c := setupDHT(ctx, t, true)

1289 1290
	connectNoSync(t, ctx, b, a)
	connectNoSync(t, ctx, c, a)
Steven Allen's avatar
Steven Allen committed
1291 1292

	// Can't use `connect` because b and c are only clients.
1293 1294
	wait(t, ctx, b, a)
	wait(t, ctx, c, a)
Steven Allen's avatar
Steven Allen committed
1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307

	pi, err := c.FindPeer(ctx, b.self)
	if err != nil {
		t.Fatal(err)
	}
	if len(pi.Addrs) == 0 {
		t.Fatal("should have found addresses for node b")
	}

	err = c.host.Connect(ctx, pi)
	if err != nil {
		t.Fatal(err)
	}
1308
}
1309

Matt Joiner's avatar
Matt Joiner committed
1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321
func minInt(a, b int) int {
	if a < b {
		return a
	} else {
		return b
	}
}

func TestFindPeerQueryMinimal(t *testing.T) {
	testFindPeerQuery(t, 2, 22, 11)
}

1322
func TestFindPeerQuery(t *testing.T) {
Matt Joiner's avatar
Matt Joiner committed
1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336
	if testing.Short() {
		t.Skip("skipping test in short mode")
	}
	if curFileLimit() < 1024 {
		t.Skip("insufficient file descriptors available")
	}
	testFindPeerQuery(t, 20, 80, 16)
}

func testFindPeerQuery(t *testing.T,
	bootstrappers, // Number of nodes connected to the querying node
	leafs, // Number of nodes that might be connected to from the bootstrappers
	bootstrapperLeafConns int, // Number of connections each bootstrapper has to the leaf nodes
) {
1337 1338 1339
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

1340
	dhts := setupDHTS(t, ctx, 1+bootstrappers+leafs)
1341
	defer func() {
Matt Joiner's avatar
Matt Joiner committed
1342 1343 1344
		for _, d := range dhts {
			d.Close()
			d.host.Close()
1345 1346 1347
		}
	}()

Jeromy's avatar
Jeromy committed
1348
	mrand := rand.New(rand.NewSource(42))
1349 1350
	guy := dhts[0]
	others := dhts[1:]
Matt Joiner's avatar
Matt Joiner committed
1351 1352 1353 1354
	for i := 0; i < bootstrappers; i++ {
		for j := 0; j < bootstrapperLeafConns; j++ {
			v := mrand.Intn(leafs)
			connect(t, ctx, others[i], others[bootstrappers+v])
1355 1356 1357
		}
	}

Matt Joiner's avatar
Matt Joiner committed
1358
	for i := 0; i < bootstrappers; i++ {
1359 1360 1361
		connect(t, ctx, guy, others[i])
	}

Matt Joiner's avatar
Matt Joiner committed
1362 1363 1364 1365 1366
	var reachableIds []peer.ID
	for i, d := range dhts {
		lp := len(d.host.Network().Peers())
		//t.Log(i, lp)
		if i != 0 && lp > 0 {
1367
			reachableIds = append(reachableIds, d.PeerID())
Matt Joiner's avatar
Matt Joiner committed
1368 1369 1370 1371
		}
	}
	t.Logf("%d reachable ids", len(reachableIds))

1372 1373 1374
	val := "foobar"
	rtval := kb.ConvertKey(val)

Jeromy's avatar
Jeromy committed
1375
	rtablePeers := guy.routingTable.NearestPeers(rtval, AlphaValue)
Matt Joiner's avatar
Matt Joiner committed
1376
	assert.Len(t, rtablePeers, minInt(bootstrappers, AlphaValue))
1377

Matt Joiner's avatar
Matt Joiner committed
1378
	assert.Len(t, guy.host.Network().Peers(), bootstrappers)
1379 1380

	out, err := guy.GetClosestPeers(ctx, val)
Matt Joiner's avatar
Matt Joiner committed
1381
	require.NoError(t, err)
1382 1383 1384 1385 1386 1387

	var outpeers []peer.ID
	for p := range out {
		outpeers = append(outpeers, p)
	}

Jeromy's avatar
Jeromy committed
1388
	sort.Sort(peer.IDSlice(outpeers))
Steven Allen's avatar
Steven Allen committed
1389

Matt Joiner's avatar
Matt Joiner committed
1390 1391
	exp := kb.SortClosestPeers(reachableIds, rtval)[:minInt(KValue, len(reachableIds))]
	t.Logf("got %d peers", len(outpeers))
1392
	got := kb.SortClosestPeers(outpeers, rtval)
Jeromy's avatar
Jeromy committed
1393

Matt Joiner's avatar
Matt Joiner committed
1394
	assert.EqualValues(t, exp, got)
1395 1396
}

1397 1398 1399 1400 1401
func TestFindClosestPeers(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	nDHTs := 30
1402
	dhts := setupDHTS(t, ctx, nDHTs)
1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

	peers, err := dhts[1].GetClosestPeers(ctx, "foo")
	if err != nil {
		t.Fatal(err)
	}

	var out []peer.ID
	for p := range peers {
		out = append(out, p)
	}

	if len(out) != KValue {
		t.Fatalf("got wrong number of peers (got %d, expected %d)", len(out), KValue)
	}
}
1429 1430

func TestGetSetPluggedProtocol(t *testing.T) {
1431 1432 1433
	t.Run("PutValue/GetValue - same protocol", func(t *testing.T) {
		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()
1434

1435 1436 1437 1438
		os := []opts.Option{
			opts.Protocols("/esh/dht"),
			opts.Client(false),
			opts.NamespacedValidator("v", blankValidator{}),
1439
			opts.DisableAutoRefresh(),
1440
		}
1441

Steven Allen's avatar
Steven Allen committed
1442
		dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
1443 1444 1445
		if err != nil {
			t.Fatal(err)
		}
1446

Steven Allen's avatar
Steven Allen committed
1447
		dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
1448 1449 1450
		if err != nil {
			t.Fatal(err)
		}
1451

1452
		connect(t, ctx, dhtA, dhtB)
1453

1454
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
1455
		defer cancel()
1456 1457 1458
		if err := dhtA.PutValue(ctxT, "/v/cat", []byte("meow")); err != nil {
			t.Fatal(err)
		}
1459

1460 1461 1462 1463
		value, err := dhtB.GetValue(ctxT, "/v/cat")
		if err != nil {
			t.Fatal(err)
		}
1464

1465 1466 1467 1468 1469
		if string(value) != "meow" {
			t.Fatalf("Expected 'meow' got '%s'", string(value))
		}
	})

1470 1471
	t.Run("DHT routing table for peer A won't contain B if A and B don't use same protocol", func(t *testing.T) {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1472 1473
		defer cancel()

Steven Allen's avatar
Steven Allen committed
1474
		dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []opts.Option{
1475 1476 1477
			opts.Protocols("/esh/dht"),
			opts.Client(false),
			opts.NamespacedValidator("v", blankValidator{}),
1478
			opts.DisableAutoRefresh(),
1479
		}...)
1480 1481 1482 1483
		if err != nil {
			t.Fatal(err)
		}

Steven Allen's avatar
Steven Allen committed
1484
		dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []opts.Option{
1485 1486 1487
			opts.Protocols("/lsr/dht"),
			opts.Client(false),
			opts.NamespacedValidator("v", blankValidator{}),
1488
			opts.DisableAutoRefresh(),
1489
		}...)
1490 1491 1492 1493
		if err != nil {
			t.Fatal(err)
		}

1494
		connectNoSync(t, ctx, dhtA, dhtB)
1495

1496 1497 1498 1499 1500 1501 1502
		// We don't expect connection notifications for A to reach B (or vice-versa), given
		// that they've been configured with different protocols - but we'll give them a
		// chance, anyhow.
		time.Sleep(time.Second * 2)

		err = dhtA.PutValue(ctx, "/v/cat", []byte("meow"))
		if err == nil || !strings.Contains(err.Error(), "failed to find any peer in table") {
1503
			t.Fatalf("put should not have been able to find any peers in routing table, err:'%v'", err)
1504 1505
		}

1506 1507
		_, err = dhtB.GetValue(ctx, "/v/cat")
		if err == nil || !strings.Contains(err.Error(), "failed to find any peer in table") {
1508
			t.Fatalf("get should not have been able to find any peers in routing table, err:'%v'", err)
1509
		}
1510
	})
1511
}
1512 1513 1514 1515 1516

func TestPing(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	ds := setupDHTS(t, ctx, 2)
1517
	ds[0].Host().Peerstore().AddAddrs(ds[1].PeerID(), ds[1].Host().Addrs(), peerstore.AddressTTL)
1518 1519 1520 1521 1522 1523 1524 1525
	assert.NoError(t, ds[0].Ping(context.Background(), ds[1].PeerID()))
}

func TestClientModeAtInit(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	pinger := setupDHT(ctx, t, false)
	client := setupDHT(ctx, t, true)
1526
	pinger.Host().Peerstore().AddAddrs(client.PeerID(), client.Host().Addrs(), peerstore.AddressTTL)
1527 1528 1529
	err := pinger.Ping(context.Background(), client.PeerID())
	assert.True(t, xerrors.Is(err, multistream.ErrNotSupported))
}