dht_test.go 35.7 KB
Newer Older
1 2
package dht

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3
import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
8
	"math/rand"
9
	"sort"
10
	"strings"
11
	"sync"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
	"testing"
13
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14

15 16 17
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"
18
	"github.com/multiformats/go-multistream"
19 20 21 22 23 24

	"golang.org/x/xerrors"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"

25 26
	opts "github.com/libp2p/go-libp2p-kad-dht/opts"
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
27

28
	"github.com/ipfs/go-cid"
29
	u "github.com/ipfs/go-ipfs-util"
30
	kb "github.com/libp2p/go-libp2p-kbucket"
31
	"github.com/libp2p/go-libp2p-record"
Steven Allen's avatar
Steven Allen committed
32
	swarmt "github.com/libp2p/go-libp2p-swarm/testing"
33
	"github.com/libp2p/go-libp2p-testing/ci"
34
	travisci "github.com/libp2p/go-libp2p-testing/ci/travis"
Jeromy's avatar
Jeromy committed
35
	bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
36
	ma "github.com/multiformats/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37 38
)

39
var testCaseCids []cid.Cid
40 41 42 43

func init() {
	for i := 0; i < 100; i++ {
		v := fmt.Sprintf("%d -- value", i)
44 45 46

		mhv := u.Hash([]byte(v))
		testCaseCids = append(testCaseCids, cid.NewCidV0(mhv))
47 48 49
	}
}

50 51 52 53 54 55 56 57 58 59
type blankValidator struct{}

func (blankValidator) Validate(_ string, _ []byte) error        { return nil }
func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

type testValidator struct{}

func (testValidator) Select(_ string, bs [][]byte) (int, error) {
	index := -1
	for i, b := range bs {
60
		if bytes.Equal(b, []byte("newer")) {
61
			index = i
62
		} else if bytes.Equal(b, []byte("valid")) {
63 64 65 66 67 68 69 70 71 72 73
			if index == -1 {
				index = i
			}
		}
	}
	if index == -1 {
		return -1, errors.New("no rec found")
	}
	return index, nil
}
func (testValidator) Validate(_ string, b []byte) error {
74
	if bytes.Equal(b, []byte("expired")) {
75 76 77 78 79
		return errors.New("expired")
	}
	return nil
}

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
type testAtomicPutValidator struct {
	testValidator
}

// selects the entry with the 'highest' last byte
func (testAtomicPutValidator) Select(_ string, bs [][]byte) (int, error) {
	index := -1
	max := uint8(0)
	for i, b := range bs {
		if bytes.Equal(b, []byte("valid")) {
			if index == -1 {
				index = i
			}
			continue
		}

		str := string(b)
		n := str[len(str)-1]
		if n > max {
			max = n
			index = i
		}

	}
	if index == -1 {
		return -1, errors.New("no rec found")
	}
	return index, nil
}

110
func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
111 112
	d, err := New(
		ctx,
Steven Allen's avatar
Steven Allen committed
113
		bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
114 115
		opts.Client(client),
		opts.NamespacedValidator("v", blankValidator{}),
116
		opts.DisableAutoRefresh(),
117 118 119
	)
	if err != nil {
		t.Fatal(err)
120
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121 122 123
	return d
}

124
func setupDHTS(t *testing.T, ctx context.Context, n int) []*IpfsDHT {
125
	addrs := make([]ma.Multiaddr, n)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126
	dhts := make([]*IpfsDHT, n)
127 128
	peers := make([]peer.ID, n)

129 130 131
	sanityAddrsMap := make(map[string]struct{})
	sanityPeersMap := make(map[string]struct{})

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132
	for i := 0; i < n; i++ {
133
		dhts[i] = setupDHT(ctx, t, false)
134 135
		peers[i] = dhts[i].PeerID()
		addrs[i] = dhts[i].host.Addrs()[0]
136 137

		if _, lol := sanityAddrsMap[addrs[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
138
			t.Fatal("While setting up DHTs address got duplicated.")
139 140 141 142
		} else {
			sanityAddrsMap[addrs[i].String()] = struct{}{}
		}
		if _, lol := sanityPeersMap[peers[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
143
			t.Fatal("While setting up DHTs peerid got duplicated.")
144 145 146
		} else {
			sanityPeersMap[peers[i].String()] = struct{}{}
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147 148
	}

149
	return dhts
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150 151
}

152
func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
153 154
	t.Helper()

155
	idB := b.self
156
	addrB := b.peerstore.Addrs(idB)
157 158
	if len(addrB) == 0 {
		t.Fatal("peers setup incorrectly: no local address")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
	}
160

161 162
	a.peerstore.AddAddrs(idB, addrB, peerstore.TempAddrTTL)
	pi := peer.AddrInfo{ID: idB}
Jeromy's avatar
Jeromy committed
163
	if err := a.host.Connect(ctx, pi); err != nil {
164
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165
	}
166 167
}

168 169
func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
	t.Helper()
170

171 172
	// loop until connection notification has been received.
	// under high load, this may not happen as immediately as we would like.
173
	for a.routingTable.Find(b.self) == "" {
174 175 176 177 178
		select {
		case <-ctx.Done():
			t.Fatal(ctx.Err())
		case <-time.After(time.Millisecond * 5):
		}
179
	}
180
}
181

182 183 184 185 186
func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
	t.Helper()
	connectNoSync(t, ctx, a, b)
	wait(t, ctx, a, b)
	wait(t, ctx, b, a)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187 188
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
189
func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
190

191
	ctx, cancel := context.WithCancel(ctx)
Steven Allen's avatar
Steven Allen committed
192 193
	defer cancel()

194
	logger.Debugf("refreshing DHTs routing tables...")
195 196 197 198 199 200 201 202 203

	// tried async. sequential fares much better. compare:
	// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
	// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
	// probably because results compound

	start := rand.Intn(len(dhts)) // randomize to decrease bias.
	for i := range dhts {
		dht := dhts[(start+i)%len(dhts)]
204
		dht.RefreshRoutingTable()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
205 206 207
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208
func TestValueGetSet(t *testing.T) {
209 210
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
211

Steven Allen's avatar
Steven Allen committed
212
	var dhts [5]*IpfsDHT
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213

Steven Allen's avatar
Steven Allen committed
214 215 216 217 218
	for i := range dhts {
		dhts[i] = setupDHT(ctx, t, false)
		defer dhts[i].Close()
		defer dhts[i].host.Close()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219

Steven Allen's avatar
Steven Allen committed
220
	connect(t, ctx, dhts[0], dhts[1])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221

Steven Allen's avatar
Steven Allen committed
222
	t.Log("adding value on: ", dhts[0].self)
Jeromy's avatar
Jeromy committed
223 224
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
Steven Allen's avatar
Steven Allen committed
225
	err := dhts[0].PutValue(ctxT, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226 227 228 229
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
230
	t.Log("requesting value on dhts: ", dhts[1].self)
Jeromy's avatar
Jeromy committed
231 232
	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
Steven Allen's avatar
Steven Allen committed
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273

	val, err := dhts[1].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
	}

	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
	}

	// late connect

	connect(t, ctx, dhts[2], dhts[0])
	connect(t, ctx, dhts[2], dhts[1])

	t.Log("requesting value (offline) on dhts: ", dhts[2].self)
	vala, err := dhts[2].GetValue(ctxT, "/v/hello", Quorum(0))
	if vala != nil {
		t.Fatalf("offline get should have failed, got %s", string(vala))
	}
	if err != routing.ErrNotFound {
		t.Fatalf("offline get should have failed with ErrNotFound, got: %s", err)
	}

	t.Log("requesting value (online) on dhts: ", dhts[2].self)
	val, err = dhts[2].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
	}

	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
	}

	for _, d := range dhts[:3] {
		connect(t, ctx, dhts[3], d)
	}
	connect(t, ctx, dhts[4], dhts[3])

	t.Log("requesting value (requires peer routing) on dhts: ", dhts[4].self)
	val, err = dhts[4].GetValue(ctxT, "/v/hello")
274 275 276 277
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
278 279
	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
280
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
281 282
}

283 284 285 286 287 288 289 290 291 292 293 294
func TestValueSetInvalid(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

295
	dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
	dhtB.Validator.(record.NamespacedValidator)["v"] = blankValidator{}

	connect(t, ctx, dhtA, dhtB)

	testSetGet := func(val string, failset bool, exp string, experr error) {
		t.Helper()

		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
		err := dhtA.PutValue(ctxT, "/v/hello", []byte(val))
		if failset {
			if err == nil {
				t.Error("expected set to fail")
			}
		} else {
			if err != nil {
				t.Error(err)
			}
		}

		ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
		defer cancel()
		valb, err := dhtB.GetValue(ctxT, "/v/hello")
		if err != experr {
			t.Errorf("Set/Get %v: Expected %v error but got %v", val, experr, err)
		} else if err == nil && string(valb) != exp {
			t.Errorf("Expected '%v' got '%s'", exp, string(valb))
		}
	}

	// Expired records should not be set
	testSetGet("expired", true, "", routing.ErrNotFound)
	// Valid record should be returned
	testSetGet("valid", false, "valid", nil)
	// Newer record should supersede previous record
	testSetGet("newer", false, "newer", nil)
	// Attempt to set older record again should be ignored
	testSetGet("valid", true, "newer", nil)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
func TestSearchValue(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

	connect(t, ctx, dhtA, dhtB)

	dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
	dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}

	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()

	err := dhtA.PutValue(ctxT, "/v/hello", []byte("valid"))
	if err != nil {
		t.Error(err)
	}

	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
363
	valCh, err := dhtA.SearchValue(ctxT, "/v/hello", Quorum(-1))
364 365 366
	if err != nil {
		t.Fatal(err)
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391

	select {
	case v := <-valCh:
		if string(v) != "valid" {
			t.Errorf("expected 'valid', got '%s'", string(v))
		}
	case <-ctxT.Done():
		t.Fatal(ctxT.Err())
	}

	err = dhtB.PutValue(ctxT, "/v/hello", []byte("newer"))
	if err != nil {
		t.Error(err)
	}

	select {
	case v := <-valCh:
		if string(v) != "newer" {
			t.Errorf("expected 'newer', got '%s'", string(v))
		}
	case <-ctxT.Done():
		t.Fatal(ctxT.Err())
	}
}

392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
func TestGetValues(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

	connect(t, ctx, dhtA, dhtB)

	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()

	err := dhtB.PutValue(ctxT, "/v/hello", []byte("newer"))
	if err != nil {
		t.Error(err)
	}

	err = dhtA.PutValue(ctxT, "/v/hello", []byte("valid"))
	if err != nil {
		t.Error(err)
	}

	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
	vals, err := dhtA.GetValues(ctxT, "/v/hello", 16)
	if err != nil {
		t.Fatal(err)
	}

	if len(vals) != 2 {
		t.Fatalf("expected to get 2 values, got %d", len(vals))
	}

430
	sort.Slice(vals, func(i, j int) bool { return string(vals[i].Val) < string(vals[j].Val) })
431 432 433 434 435 436 437 438 439

	if string(vals[0].Val) != "valid" {
		t.Errorf("unexpected vals[0]: %s", string(vals[0].Val))
	}
	if string(vals[1].Val) != "valid" {
		t.Errorf("unexpected vals[1]: %s", string(vals[1].Val))
	}
}

440 441 442 443 444 445 446 447 448 449 450 451 452
func TestValueGetInvalid(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

	dhtA.Validator.(record.NamespacedValidator)["v"] = blankValidator{}
453
	dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}
454 455 456 457

	connect(t, ctx, dhtA, dhtB)

	testSetGet := func(val string, exp string, experr error) {
458 459
		t.Helper()

460 461 462 463
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
		err := dhtA.PutValue(ctxT, "/v/hello", []byte(val))
		if err != nil {
464
			t.Error(err)
465 466 467 468 469 470
		}

		ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
		defer cancel()
		valb, err := dhtB.GetValue(ctxT, "/v/hello")
		if err != experr {
Łukasz Magiera's avatar
Łukasz Magiera committed
471
			t.Errorf("Set/Get %v: Expected '%v' error but got '%v'", val, experr, err)
472 473
		} else if err == nil && string(valb) != exp {
			t.Errorf("Expected '%v' got '%s'", exp, string(valb))
474 475 476 477 478 479 480 481 482 483 484 485 486
		}
	}

	// Expired records should not be returned
	testSetGet("expired", "", routing.ErrNotFound)
	// Valid record should be returned
	testSetGet("valid", "valid", nil)
	// Newer record should supersede previous record
	testSetGet("newer", "newer", nil)
	// Attempt to set older record again should be ignored
	testSetGet("valid", "newer", nil)
}

487
func TestInvalidMessageSenderTracking(t *testing.T) {
Steven Allen's avatar
Steven Allen committed
488 489 490
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

491
	dht := setupDHT(ctx, t, false)
Steven Allen's avatar
Steven Allen committed
492 493
	defer dht.Close()

494
	foo := peer.ID("asdasd")
Steven Allen's avatar
Steven Allen committed
495
	_, err := dht.messageSenderForPeer(ctx, foo)
496 497 498 499 500
	if err == nil {
		t.Fatal("that shouldnt have succeeded")
	}

	dht.smlk.Lock()
Steven Allen's avatar
Steven Allen committed
501 502 503 504
	mscnt := len(dht.strmap)
	dht.smlk.Unlock()

	if mscnt > 0 {
505 506 507 508
		t.Fatal("should have no message senders in map")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
509 510
func TestProvides(t *testing.T) {
	// t.Skip("skipping test to debug another")
Steven Allen's avatar
Steven Allen committed
511 512
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
513

514
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
515 516
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
517
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
518
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
519 520 521
		}
	}()

522 523 524
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
525

526
	for _, k := range testCaseCids {
Matt Joiner's avatar
Matt Joiner committed
527
		logger.Debugf("announcing provider for %s", k)
Jeromy's avatar
Jeromy committed
528
		if err := dhts[3].Provide(ctx, k, true); err != nil {
529 530
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
531 532
	}

533 534 535 536
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)

	n := 0
537
	for _, c := range testCaseCids {
538 539
		n = (n + 1) % 3

Matt Joiner's avatar
Matt Joiner committed
540
		logger.Debugf("getting providers for %s from %d", c, n)
Jeromy's avatar
Jeromy committed
541 542
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
543
		provchan := dhts[n].FindProvidersAsync(ctxT, c, 1)
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558

		select {
		case prov := <-provchan:
			if prov.ID == "" {
				t.Fatal("Got back nil provider")
			}
			if prov.ID != dhts[3].self {
				t.Fatal("Got back wrong provider")
			}
		case <-ctxT.Done():
			t.Fatal("Did not get a provider back.")
		}
	}
}

Jeromy's avatar
Jeromy committed
559 560
func TestLocalProvides(t *testing.T) {
	// t.Skip("skipping test to debug another")
Steven Allen's avatar
Steven Allen committed
561 562
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Jeromy's avatar
Jeromy committed
563

564
	dhts := setupDHTS(t, ctx, 4)
Jeromy's avatar
Jeromy committed
565 566 567 568 569 570 571 572 573 574 575 576
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])

	for _, k := range testCaseCids {
Matt Joiner's avatar
Matt Joiner committed
577
		logger.Debugf("announcing provider for %s", k)
Jeromy's avatar
Jeromy committed
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594
		if err := dhts[3].Provide(ctx, k, false); err != nil {
			t.Fatal(err)
		}
	}

	time.Sleep(time.Millisecond * 10)

	for _, c := range testCaseCids {
		for i := 0; i < 3; i++ {
			provs := dhts[i].providers.GetProviders(ctx, c)
			if len(provs) > 0 {
				t.Fatal("shouldnt know this")
			}
		}
	}
}

595 596 597 598 599 600 601 602 603 604
// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
	// test "well-formed-ness" (>= minPeers peers in every routing table)

	checkTables := func() bool {
		totalPeers := 0
		for _, dht := range dhts {
			rtlen := dht.routingTable.Size()
			totalPeers += rtlen
			if minPeers > 0 && rtlen < minPeers {
605
				//t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
				return false
			}
		}
		actualAvgPeers := totalPeers / len(dhts)
		t.Logf("avg rt size: %d", actualAvgPeers)
		if avgPeers > 0 && actualAvgPeers < avgPeers {
			t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers)
			return false
		}
		return true
	}

	timeoutA := time.After(timeout)
	for {
		select {
		case <-timeoutA:
Matt Joiner's avatar
Matt Joiner committed
622
			logger.Debugf("did not reach well-formed routing tables by %s", timeout)
623 624 625 626 627 628 629 630 631 632 633
			return false // failed
		case <-time.After(5 * time.Millisecond):
			if checkTables() {
				return true // succeeded
			}
		}
	}
}

func printRoutingTables(dhts []*IpfsDHT) {
	// the routing tables should be full now. let's inspect them.
634
	fmt.Printf("checking routing table of %d\n", len(dhts))
635 636 637 638 639 640 641
	for _, dht := range dhts {
		fmt.Printf("checking routing table of %s\n", dht.self)
		dht.routingTable.Print()
		fmt.Println("")
	}
}

642
func TestRefresh(t *testing.T) {
643 644 645 646
	if testing.Short() {
		t.SkipNow()
	}

Steven Allen's avatar
Steven Allen committed
647 648
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
649

650
	nDHTs := 30
651
	dhts := setupDHTS(t, ctx, nDHTs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
652 653 654
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
655
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
656 657 658 659 660 661 662 663
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

664
	<-time.After(100 * time.Millisecond)
665 666 667 668
	// bootstrap a few times until we get good tables.
	stop := make(chan struct{})
	go func() {
		for {
669
			t.Logf("bootstrapping them so they find each other %d", nDHTs)
Jeromy's avatar
Jeromy committed
670 671
			ctxT, cancel := context.WithTimeout(ctx, 5*time.Second)
			defer cancel()
672 673 674 675 676 677 678 679 680 681 682
			bootstrap(t, ctxT, dhts)

			select {
			case <-time.After(50 * time.Millisecond):
				continue // being explicit
			case <-stop:
				return
			}
		}
	}()

683
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
684
	close(stop)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
685

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
686 687
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
688 689 690 691
		printRoutingTables(dhts)
	}
}

692
func TestRefreshBelowMinRTThreshold(t *testing.T) {
Aarsh Shah's avatar
Aarsh Shah committed
693
	ctx := context.Background()
694 695 696 697 698 699 700 701 702 703 704 705

	// enable auto bootstrap on A
	dhtA, err := New(
		ctx,
		bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
		opts.Client(false),
		opts.NamespacedValidator("v", blankValidator{}),
	)
	if err != nil {
		t.Fatal(err)
	}

Aarsh Shah's avatar
Aarsh Shah committed
706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723
	dhtB := setupDHT(ctx, t, false)
	dhtC := setupDHT(ctx, t, false)

	defer func() {
		dhtA.Close()
		dhtA.host.Close()

		dhtB.Close()
		dhtB.host.Close()

		dhtC.Close()
		dhtC.host.Close()
	}()

	connect(t, ctx, dhtA, dhtB)
	connect(t, ctx, dhtB, dhtC)

	// we ONLY init bootstrap on A
724
	dhtA.RefreshRoutingTable()
Aarsh Shah's avatar
Aarsh Shah committed
725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751
	// and wait for one round to complete i.e. A should be connected to both B & C
	waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 2, 2, 20*time.Second)

	// now we create two new peers
	dhtD := setupDHT(ctx, t, false)
	dhtE := setupDHT(ctx, t, false)

	// connect them to each other
	connect(t, ctx, dhtD, dhtE)
	defer func() {
		dhtD.Close()
		dhtD.host.Close()

		dhtE.Close()
		dhtE.host.Close()
	}()

	// and then, on connecting the peer D to A, the min RT threshold gets triggered on A which leads to a bootstrap.
	// since the default bootstrap scan interval is 30 mins - 1 hour, we can be sure that if bootstrap happens,
	// it is because of the min RT threshold getting triggered (since default min value is 4 & we only have 2 peers in the RT when D gets connected)
	connect(t, ctx, dhtA, dhtD)

	// and because of the above bootstrap, A also discovers E !
	waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second)
	assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!")
}

752
func TestPeriodicRefresh(t *testing.T) {
753 754 755
	if ci.IsRunning() {
		t.Skip("skipping on CI. highly timing dependent")
	}
756 757 758 759
	if testing.Short() {
		t.SkipNow()
	}

Steven Allen's avatar
Steven Allen committed
760 761
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
762 763

	nDHTs := 30
764
	dhts := setupDHTS(t, ctx, nDHTs)
765 766 767 768 769 770 771
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

772
	t.Logf("dhts are not connected. %d", nDHTs)
773 774 775 776 777 778 779 780 781 782 783
	for _, dht := range dhts {
		rtlen := dht.routingTable.Size()
		if rtlen > 0 {
			t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen)
		}
	}

	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

784
	t.Logf("DHTs are now connected to 1-2 others. %d", nDHTs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
785
	for _, dht := range dhts {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
786
		rtlen := dht.routingTable.Size()
787 788
		if rtlen > 2 {
			t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
789
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
790
	}
791

792 793 794 795
	if u.Debug {
		printRoutingTables(dhts)
	}

796
	t.Logf("bootstrapping them so they find each other. %d", nDHTs)
Matt Joiner's avatar
Matt Joiner committed
797
	for _, dht := range dhts {
798
		dht.RefreshRoutingTable()
799
	}
800 801 802

	// this is async, and we dont know when it's finished with one cycle, so keep checking
	// until the routing tables look better, or some long timeout for the failure case.
803
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
804 805 806

	if u.Debug {
		printRoutingTables(dhts)
807
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
808 809
}

810 811
func TestProvidesMany(t *testing.T) {
	t.Skip("this test doesn't work")
Steven Allen's avatar
Steven Allen committed
812 813
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
814 815

	nDHTs := 40
816
	dhts := setupDHTS(t, ctx, nDHTs)
817 818 819
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
820
			defer dhts[i].host.Close()
821 822 823 824 825 826 827 828
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

829
	<-time.After(100 * time.Millisecond)
830
	t.Logf("bootstrapping them so they find each other. %d", nDHTs)
Jeromy's avatar
Jeromy committed
831 832
	ctxT, cancel := context.WithTimeout(ctx, 20*time.Second)
	defer cancel()
833 834
	bootstrap(t, ctxT, dhts)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
835 836 837 838 839 840 841 842
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
		t.Logf("checking routing table of %d", nDHTs)
		for _, dht := range dhts {
			fmt.Printf("checking routing table of %s\n", dht.self)
			dht.routingTable.Print()
			fmt.Println("")
		}
843
	}
844

845
	providers := make(map[cid.Cid]peer.ID)
846

847
	d := 0
848
	for _, c := range testCaseCids {
849 850
		d = (d + 1) % len(dhts)
		dht := dhts[d]
851
		providers[c] = dht.self
852

853
		t.Logf("announcing provider for %s", c)
Jeromy's avatar
Jeromy committed
854
		if err := dht.Provide(ctx, c, true); err != nil {
855 856
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
857 858
	}

859 860
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
861

862 863
	errchan := make(chan error)

Jeromy's avatar
Jeromy committed
864 865
	ctxT, cancel = context.WithTimeout(ctx, 5*time.Second)
	defer cancel()
866 867

	var wg sync.WaitGroup
868
	getProvider := func(dht *IpfsDHT, k cid.Cid) {
869
		defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
870

871
		expected := providers[k]
872

873 874 875
		provchan := dht.FindProvidersAsync(ctxT, k, 1)
		select {
		case prov := <-provchan:
876 877
			actual := prov.ID
			if actual == "" {
878
				errchan <- fmt.Errorf("Got back nil provider (%s at %s)", k, dht.self)
879 880 881
			} else if actual != expected {
				errchan <- fmt.Errorf("Got back wrong provider (%s != %s) (%s at %s)",
					expected, actual, k, dht.self)
882 883 884
			}
		case <-ctxT.Done():
			errchan <- fmt.Errorf("Did not get a provider back (%s at %s)", k, dht.self)
Jeromy's avatar
Jeromy committed
885
		}
886 887
	}

888
	for _, c := range testCaseCids {
889 890
		// everyone should be able to find it...
		for _, dht := range dhts {
Matt Joiner's avatar
Matt Joiner committed
891
			logger.Debugf("getting providers for %s at %s", c, dht.self)
892
			wg.Add(1)
893
			go getProvider(dht, c)
894
		}
895 896 897 898 899 900 901 902 903 904
	}

	// we need this because of printing errors
	go func() {
		wg.Wait()
		close(errchan)
	}()

	for err := range errchan {
		t.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
905 906 907
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
908
func TestProvidesAsync(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
909
	// t.Skip("skipping test to debug another")
910 911 912
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
913

Steven Allen's avatar
Steven Allen committed
914 915
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
916

917
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
918 919
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
920
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
921
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
922 923 924
		}
	}()

925 926 927
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
928

Jeromy's avatar
Jeromy committed
929
	err := dhts[3].Provide(ctx, testCaseCids[0], true)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
930 931 932 933 934 935
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(time.Millisecond * 60)

Jeromy's avatar
Jeromy committed
936 937
	ctxT, cancel := context.WithTimeout(ctx, time.Millisecond*300)
	defer cancel()
938
	provs := dhts[0].FindProvidersAsync(ctxT, testCaseCids[0], 5)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
939
	select {
Jeromy's avatar
Jeromy committed
940 941 942 943
	case p, ok := <-provs:
		if !ok {
			t.Fatal("Provider channel was closed...")
		}
944
		if p.ID == "" {
Jeromy's avatar
Jeromy committed
945 946
			t.Fatal("Got back nil provider!")
		}
947
		if p.ID != dhts[3].self {
948
			t.Fatalf("got a provider, but not the right one. %s", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
949
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
950
	case <-ctxT.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
951 952 953 954
		t.Fatal("Didnt get back providers")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
955
func TestLayeredGet(t *testing.T) {
956 957
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
958

959
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
960 961
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
962
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
963
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
964 965 966
		}
	}()

967 968
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
969
	connect(t, ctx, dhts[2], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
970

971
	err := dhts[3].PutValue(ctx, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
972 973 974 975
	if err != nil {
		t.Fatal(err)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
976
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
977

Jeromy's avatar
Jeromy committed
978 979
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
980 981 982
	val, err := dhts[0].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
983
	}
984 985 986

	if string(val) != "world" {
		t.Error("got wrong value")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
987 988 989
	}
}

990 991 992 993 994 995 996 997
func TestUnfindablePeer(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

998
	dhts := setupDHTS(t, ctx, 4)
999 1000 1001
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
1002
			dhts[i].Host().Close()
1003 1004 1005 1006 1007 1008 1009 1010
		}
	}()

	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[2], dhts[3])

	// Give DHT 1 a bad addr for DHT 2.
1011 1012
	dhts[1].host.Peerstore().ClearAddrs(dhts[2].PeerID())
	dhts[1].host.Peerstore().AddAddr(dhts[2].PeerID(), dhts[0].Host().Addrs()[0], time.Minute)
1013 1014 1015

	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
1016
	_, err := dhts[0].FindPeer(ctxT, dhts[3].PeerID())
1017 1018 1019 1020 1021 1022 1023 1024
	if err == nil {
		t.Error("should have failed to find peer")
	}
	if ctxT.Err() != nil {
		t.Error("FindPeer should have failed before context expired")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1025
func TestFindPeer(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1026
	// t.Skip("skipping test to debug another")
1027 1028 1029
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1030

Steven Allen's avatar
Steven Allen committed
1031 1032
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1033

1034
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1035 1036
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1037
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1038
			dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1039 1040 1041
		}
	}()

1042 1043 1044
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1045

Jeromy's avatar
Jeromy committed
1046 1047
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
1048
	p, err := dhts[0].FindPeer(ctxT, dhts[2].PeerID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1049 1050 1051 1052
	if err != nil {
		t.Fatal(err)
	}

1053
	if p.ID == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1054 1055 1056
		t.Fatal("Failed to find peer.")
	}

1057
	if p.ID != dhts[2].PeerID() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1058 1059 1060
		t.Fatal("Didnt find expected peer.")
	}
}
1061

1062
func TestFindPeersConnectedToPeer(t *testing.T) {
1063 1064
	t.Skip("not quite correct (see note)")

1065 1066 1067 1068
	if testing.Short() {
		t.SkipNow()
	}

Steven Allen's avatar
Steven Allen committed
1069 1070
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
1071

1072
	dhts := setupDHTS(t, ctx, 4)
1073 1074 1075
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1076
			dhts[i].host.Close()
1077 1078 1079 1080 1081
		}
	}()

	// topology:
	// 0-1, 1-2, 1-3, 2-3
1082 1083 1084 1085
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
	connect(t, ctx, dhts[2], dhts[3])
1086 1087 1088 1089 1090 1091

	// fmt.Println("0 is", peers[0])
	// fmt.Println("1 is", peers[1])
	// fmt.Println("2 is", peers[2])
	// fmt.Println("3 is", peers[3])

Jeromy's avatar
Jeromy committed
1092 1093
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
1094
	pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, dhts[2].PeerID())
1095 1096 1097 1098
	if err != nil {
		t.Fatal(err)
	}

1099
	// shouldFind := []peer.ID{peers[1], peers[3]}
1100
	var found []*peer.AddrInfo
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
	for nextp := range pchan {
		found = append(found, nextp)
	}

	// fmt.Printf("querying 0 (%s) FindPeersConnectedToPeer 2 (%s)\n", peers[0], peers[2])
	// fmt.Println("should find 1, 3", shouldFind)
	// fmt.Println("found", found)

	// testPeerListsMatch(t, shouldFind, found)

Matt Joiner's avatar
Matt Joiner committed
1111
	logger.Warning("TestFindPeersConnectedToPeer is not quite correct")
1112 1113 1114 1115 1116
	if len(found) == 0 {
		t.Fatal("didn't find any peers.")
	}
}

1117
func TestConnectCollision(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1118
	// t.Skip("skipping test to debug another")
1119 1120 1121
	if testing.Short() {
		t.SkipNow()
	}
1122 1123 1124
	if travisci.IsRunning() {
		t.Skip("Skipping on Travis-CI.")
	}
1125

1126
	runTimes := 10
1127

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1128
	for rtime := 0; rtime < runTimes; rtime++ {
Matt Joiner's avatar
Matt Joiner committed
1129
		logger.Info("Running Time: ", rtime)
1130

Steven Allen's avatar
Steven Allen committed
1131
		ctx, cancel := context.WithCancel(context.Background())
1132

1133 1134
		dhtA := setupDHT(ctx, t, false)
		dhtB := setupDHT(ctx, t, false)
1135

1136 1137
		addrA := dhtA.peerstore.Addrs(dhtA.self)[0]
		addrB := dhtB.peerstore.Addrs(dhtB.self)[0]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1138

1139 1140
		peerA := dhtA.self
		peerB := dhtB.self
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1141

1142
		errs := make(chan error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1143
		go func() {
1144 1145
			dhtA.peerstore.AddAddr(peerB, addrB, peerstore.TempAddrTTL)
			pi := peer.AddrInfo{ID: peerB}
Jeromy's avatar
Jeromy committed
1146
			err := dhtA.host.Connect(ctx, pi)
1147
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1148 1149
		}()
		go func() {
1150 1151
			dhtB.peerstore.AddAddr(peerA, addrA, peerstore.TempAddrTTL)
			pi := peer.AddrInfo{ID: peerA}
Jeromy's avatar
Jeromy committed
1152
			err := dhtB.host.Connect(ctx, pi)
1153
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1154 1155
		}()

1156
		timeout := time.After(5 * time.Second)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1157
		select {
1158 1159 1160 1161
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1162 1163 1164 1165
		case <-timeout:
			t.Fatal("Timeout received!")
		}
		select {
1166 1167 1168 1169
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1170 1171 1172 1173
		case <-timeout:
			t.Fatal("Timeout received!")
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1174 1175
		dhtA.Close()
		dhtB.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1176 1177
		dhtA.host.Close()
		dhtB.host.Close()
Steven Allen's avatar
Steven Allen committed
1178
		cancel()
Jeromy's avatar
Jeromy committed
1179
	}
1180
}
1181 1182 1183 1184 1185

func TestBadProtoMessages(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

1186
	d := setupDHT(ctx, t, false)
1187 1188 1189 1190 1191 1192

	nilrec := new(pb.Message)
	if _, err := d.handlePutValue(ctx, "testpeer", nilrec); err == nil {
		t.Fatal("should have errored on nil record")
	}
}
1193

1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238
func TestAtomicPut(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	d := setupDHT(ctx, t, false)
	d.Validator = testAtomicPutValidator{}

	// fnc to put a record
	key := "testkey"
	putRecord := func(value []byte) error {
		rec := record.MakePutRecord(key, value)
		pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
		pmes.Record = rec
		_, err := d.handlePutValue(ctx, "testpeer", pmes)
		return err
	}

	// put a valid record
	if err := putRecord([]byte("valid")); err != nil {
		t.Fatal("should not have errored on a valid record")
	}

	// simultaneous puts for old & new values
	values := [][]byte{[]byte("newer1"), []byte("newer7"), []byte("newer3"), []byte("newer5")}
	var wg sync.WaitGroup
	for _, v := range values {
		wg.Add(1)
		go func(v []byte) {
			defer wg.Done()
			putRecord(v)
		}(v)
	}
	wg.Wait()

	// get should return the newest value
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
	msg, err := d.handleGetValue(ctx, "testkey", pmes)
	if err != nil {
		t.Fatalf("should not have errored on final get, but got %+v", err)
	}
	if string(msg.GetRecord().Value) != "newer7" {
		t.Fatalf("Expected 'newer7' got '%s'", string(msg.GetRecord().Value))
	}
}

1239 1240 1241 1242 1243 1244 1245 1246 1247
func TestClientModeConnect(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	a := setupDHT(ctx, t, false)
	b := setupDHT(ctx, t, true)

	connectNoSync(t, ctx, a, b)

1248
	c := testCaseCids[0]
1249
	p := peer.ID("TestPeer")
1250 1251
	a.providers.AddProvider(ctx, c, p)
	time.Sleep(time.Millisecond * 5) // just in case...
1252

1253
	provs, err := b.FindProviders(ctx, c)
1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264
	if err != nil {
		t.Fatal(err)
	}

	if len(provs) == 0 {
		t.Fatal("Expected to get a provider back")
	}

	if provs[0].ID != p {
		t.Fatal("expected it to be our test peer")
	}
Steven Allen's avatar
Steven Allen committed
1265 1266 1267 1268 1269 1270 1271 1272 1273
	if a.routingTable.Find(b.self) != "" {
		t.Fatal("DHT clients should not be added to routing tables")
	}
	if b.routingTable.Find(a.self) == "" {
		t.Fatal("DHT server should have been added to the dht client's routing table")
	}
}

func TestClientModeFindPeer(t *testing.T) {
1274
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Steven Allen's avatar
Steven Allen committed
1275 1276 1277 1278 1279 1280
	defer cancel()

	a := setupDHT(ctx, t, false)
	b := setupDHT(ctx, t, true)
	c := setupDHT(ctx, t, true)

1281 1282
	connectNoSync(t, ctx, b, a)
	connectNoSync(t, ctx, c, a)
Steven Allen's avatar
Steven Allen committed
1283 1284

	// Can't use `connect` because b and c are only clients.
1285 1286
	wait(t, ctx, b, a)
	wait(t, ctx, c, a)
Steven Allen's avatar
Steven Allen committed
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299

	pi, err := c.FindPeer(ctx, b.self)
	if err != nil {
		t.Fatal(err)
	}
	if len(pi.Addrs) == 0 {
		t.Fatal("should have found addresses for node b")
	}

	err = c.host.Connect(ctx, pi)
	if err != nil {
		t.Fatal(err)
	}
1300
}
1301

Matt Joiner's avatar
Matt Joiner committed
1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313
func minInt(a, b int) int {
	if a < b {
		return a
	} else {
		return b
	}
}

func TestFindPeerQueryMinimal(t *testing.T) {
	testFindPeerQuery(t, 2, 22, 11)
}

1314
func TestFindPeerQuery(t *testing.T) {
Matt Joiner's avatar
Matt Joiner committed
1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328
	if testing.Short() {
		t.Skip("skipping test in short mode")
	}
	if curFileLimit() < 1024 {
		t.Skip("insufficient file descriptors available")
	}
	testFindPeerQuery(t, 20, 80, 16)
}

func testFindPeerQuery(t *testing.T,
	bootstrappers, // Number of nodes connected to the querying node
	leafs, // Number of nodes that might be connected to from the bootstrappers
	bootstrapperLeafConns int, // Number of connections each bootstrapper has to the leaf nodes
) {
1329 1330 1331
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

1332
	dhts := setupDHTS(t, ctx, 1+bootstrappers+leafs)
1333
	defer func() {
Matt Joiner's avatar
Matt Joiner committed
1334 1335 1336
		for _, d := range dhts {
			d.Close()
			d.host.Close()
1337 1338 1339
		}
	}()

Jeromy's avatar
Jeromy committed
1340
	mrand := rand.New(rand.NewSource(42))
1341 1342
	guy := dhts[0]
	others := dhts[1:]
Matt Joiner's avatar
Matt Joiner committed
1343 1344 1345 1346
	for i := 0; i < bootstrappers; i++ {
		for j := 0; j < bootstrapperLeafConns; j++ {
			v := mrand.Intn(leafs)
			connect(t, ctx, others[i], others[bootstrappers+v])
1347 1348 1349
		}
	}

Matt Joiner's avatar
Matt Joiner committed
1350
	for i := 0; i < bootstrappers; i++ {
1351 1352 1353
		connect(t, ctx, guy, others[i])
	}

Matt Joiner's avatar
Matt Joiner committed
1354 1355 1356 1357 1358
	var reachableIds []peer.ID
	for i, d := range dhts {
		lp := len(d.host.Network().Peers())
		//t.Log(i, lp)
		if i != 0 && lp > 0 {
1359
			reachableIds = append(reachableIds, d.PeerID())
Matt Joiner's avatar
Matt Joiner committed
1360 1361 1362 1363
		}
	}
	t.Logf("%d reachable ids", len(reachableIds))

1364 1365 1366
	val := "foobar"
	rtval := kb.ConvertKey(val)

Jeromy's avatar
Jeromy committed
1367
	rtablePeers := guy.routingTable.NearestPeers(rtval, AlphaValue)
Matt Joiner's avatar
Matt Joiner committed
1368
	assert.Len(t, rtablePeers, minInt(bootstrappers, AlphaValue))
1369

Matt Joiner's avatar
Matt Joiner committed
1370
	assert.Len(t, guy.host.Network().Peers(), bootstrappers)
1371 1372

	out, err := guy.GetClosestPeers(ctx, val)
Matt Joiner's avatar
Matt Joiner committed
1373
	require.NoError(t, err)
1374 1375 1376 1377 1378 1379

	var outpeers []peer.ID
	for p := range out {
		outpeers = append(outpeers, p)
	}

Jeromy's avatar
Jeromy committed
1380
	sort.Sort(peer.IDSlice(outpeers))
Steven Allen's avatar
Steven Allen committed
1381

Matt Joiner's avatar
Matt Joiner committed
1382 1383
	exp := kb.SortClosestPeers(reachableIds, rtval)[:minInt(KValue, len(reachableIds))]
	t.Logf("got %d peers", len(outpeers))
1384
	got := kb.SortClosestPeers(outpeers, rtval)
Jeromy's avatar
Jeromy committed
1385

Matt Joiner's avatar
Matt Joiner committed
1386
	assert.EqualValues(t, exp, got)
1387 1388
}

1389 1390 1391 1392 1393
func TestFindClosestPeers(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	nDHTs := 30
1394
	dhts := setupDHTS(t, ctx, nDHTs)
1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

	peers, err := dhts[1].GetClosestPeers(ctx, "foo")
	if err != nil {
		t.Fatal(err)
	}

	var out []peer.ID
	for p := range peers {
		out = append(out, p)
	}

	if len(out) != KValue {
		t.Fatalf("got wrong number of peers (got %d, expected %d)", len(out), KValue)
	}
}
1421 1422

func TestGetSetPluggedProtocol(t *testing.T) {
1423 1424 1425
	t.Run("PutValue/GetValue - same protocol", func(t *testing.T) {
		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()
1426

1427 1428 1429 1430
		os := []opts.Option{
			opts.Protocols("/esh/dht"),
			opts.Client(false),
			opts.NamespacedValidator("v", blankValidator{}),
1431
			opts.DisableAutoRefresh(),
1432
		}
1433

Steven Allen's avatar
Steven Allen committed
1434
		dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
1435 1436 1437
		if err != nil {
			t.Fatal(err)
		}
1438

Steven Allen's avatar
Steven Allen committed
1439
		dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
1440 1441 1442
		if err != nil {
			t.Fatal(err)
		}
1443

1444
		connect(t, ctx, dhtA, dhtB)
1445

1446
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
1447
		defer cancel()
1448 1449 1450
		if err := dhtA.PutValue(ctxT, "/v/cat", []byte("meow")); err != nil {
			t.Fatal(err)
		}
1451

1452 1453 1454 1455
		value, err := dhtB.GetValue(ctxT, "/v/cat")
		if err != nil {
			t.Fatal(err)
		}
1456

1457 1458 1459 1460 1461
		if string(value) != "meow" {
			t.Fatalf("Expected 'meow' got '%s'", string(value))
		}
	})

1462 1463
	t.Run("DHT routing table for peer A won't contain B if A and B don't use same protocol", func(t *testing.T) {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1464 1465
		defer cancel()

Steven Allen's avatar
Steven Allen committed
1466
		dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []opts.Option{
1467 1468 1469
			opts.Protocols("/esh/dht"),
			opts.Client(false),
			opts.NamespacedValidator("v", blankValidator{}),
1470
			opts.DisableAutoRefresh(),
1471
		}...)
1472 1473 1474 1475
		if err != nil {
			t.Fatal(err)
		}

Steven Allen's avatar
Steven Allen committed
1476
		dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []opts.Option{
1477 1478 1479
			opts.Protocols("/lsr/dht"),
			opts.Client(false),
			opts.NamespacedValidator("v", blankValidator{}),
1480
			opts.DisableAutoRefresh(),
1481
		}...)
1482 1483 1484 1485
		if err != nil {
			t.Fatal(err)
		}

1486
		connectNoSync(t, ctx, dhtA, dhtB)
1487

1488 1489 1490 1491 1492 1493 1494
		// We don't expect connection notifications for A to reach B (or vice-versa), given
		// that they've been configured with different protocols - but we'll give them a
		// chance, anyhow.
		time.Sleep(time.Second * 2)

		err = dhtA.PutValue(ctx, "/v/cat", []byte("meow"))
		if err == nil || !strings.Contains(err.Error(), "failed to find any peer in table") {
1495
			t.Fatalf("put should not have been able to find any peers in routing table, err:'%v'", err)
1496 1497
		}

1498 1499
		_, err = dhtB.GetValue(ctx, "/v/cat")
		if err == nil || !strings.Contains(err.Error(), "failed to find any peer in table") {
1500
			t.Fatalf("get should not have been able to find any peers in routing table, err:'%v'", err)
1501
		}
1502
	})
1503
}
1504 1505 1506 1507 1508

func TestPing(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	ds := setupDHTS(t, ctx, 2)
1509
	ds[0].Host().Peerstore().AddAddrs(ds[1].PeerID(), ds[1].Host().Addrs(), peerstore.AddressTTL)
1510 1511 1512 1513 1514 1515 1516 1517
	assert.NoError(t, ds[0].Ping(context.Background(), ds[1].PeerID()))
}

func TestClientModeAtInit(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	pinger := setupDHT(ctx, t, false)
	client := setupDHT(ctx, t, true)
1518
	pinger.Host().Peerstore().AddAddrs(client.PeerID(), client.Host().Addrs(), peerstore.AddressTTL)
1519 1520 1521
	err := pinger.Ping(context.Background(), client.PeerID())
	assert.True(t, xerrors.Is(err, multistream.ErrNotSupported))
}