dht_test.go 50.4 KB
Newer Older
1 2
package dht

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3
import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"encoding/binary"
7
	"errors"
8
	"fmt"
9
	"math/rand"
10
	"sort"
11
	"strings"
12
	"sync"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	"testing"
14
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15

16 17
	"github.com/libp2p/go-libp2p-core/event"
	"github.com/libp2p/go-libp2p-core/network"
18 19 20
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"
21

Will Scott's avatar
Will Scott committed
22
	test "github.com/libp2p/go-libp2p-kad-dht/internal/testing"
Adin Schmahmann's avatar
go fmt  
Adin Schmahmann committed
23
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
24
	kb "github.com/libp2p/go-libp2p-kbucket"
25
	record "github.com/libp2p/go-libp2p-record"
Steven Allen's avatar
Steven Allen committed
26
	swarmt "github.com/libp2p/go-libp2p-swarm/testing"
Jeromy's avatar
Jeromy committed
27
	bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
28

29
	"github.com/ipfs/go-cid"
30
	detectrace "github.com/ipfs/go-detect-race"
31 32 33 34 35 36 37
	u "github.com/ipfs/go-ipfs-util"
	ma "github.com/multiformats/go-multiaddr"
	"github.com/multiformats/go-multihash"
	"github.com/multiformats/go-multistream"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38 39
)

40
var testCaseCids []cid.Cid
41 42 43 44

func init() {
	for i := 0; i < 100; i++ {
		v := fmt.Sprintf("%d -- value", i)
45

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
		var newCid cid.Cid
		switch i % 3 {
		case 0:
			mhv := u.Hash([]byte(v))
			newCid = cid.NewCidV0(mhv)
		case 1:
			mhv := u.Hash([]byte(v))
			newCid = cid.NewCidV1(cid.DagCBOR, mhv)
		case 2:
			rawMh := make([]byte, 12)
			binary.PutUvarint(rawMh, cid.Raw)
			binary.PutUvarint(rawMh[1:], 10)
			copy(rawMh[2:], []byte(v)[:10])
			_, mhv, err := multihash.MHFromBytes(rawMh)
			if err != nil {
				panic(err)
			}
			newCid = cid.NewCidV1(cid.Raw, mhv)
		}
		testCaseCids = append(testCaseCids, newCid)
66 67 68
	}
}

69 70 71 72 73
type blankValidator struct{}

func (blankValidator) Validate(_ string, _ []byte) error        { return nil }
func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

74
type testAtomicPutValidator struct {
Will Scott's avatar
Will Scott committed
75
	test.TestValidator
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
}

// selects the entry with the 'highest' last byte
func (testAtomicPutValidator) Select(_ string, bs [][]byte) (int, error) {
	index := -1
	max := uint8(0)
	for i, b := range bs {
		if bytes.Equal(b, []byte("valid")) {
			if index == -1 {
				index = i
			}
			continue
		}

		str := string(b)
		n := str[len(str)-1]
		if n > max {
			max = n
			index = i
		}

	}
	if index == -1 {
		return -1, errors.New("no rec found")
	}
	return index, nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
104 105
var testPrefix = ProtocolPrefix("/test")

106 107
func setupDHT(ctx context.Context, t *testing.T, client bool, options ...Option) *IpfsDHT {
	baseOpts := []Option{
Adin Schmahmann's avatar
Adin Schmahmann committed
108
		testPrefix,
109 110
		NamespacedValidator("v", blankValidator{}),
		DisableAutoRefresh(),
111 112 113
	}

	if client {
114
		baseOpts = append(baseOpts, Mode(ModeClient))
115
	} else {
116
		baseOpts = append(baseOpts, Mode(ModeServer))
117 118
	}

119 120
	d, err := New(
		ctx,
Steven Allen's avatar
Steven Allen committed
121
		bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
122
		append(baseOpts, options...)...,
123 124 125
	)
	if err != nil {
		t.Fatal(err)
126
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127 128 129
	return d
}

Steven Allen's avatar
Steven Allen committed
130
func setupDHTS(t *testing.T, ctx context.Context, n int, options ...Option) []*IpfsDHT {
131
	addrs := make([]ma.Multiaddr, n)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132
	dhts := make([]*IpfsDHT, n)
133 134
	peers := make([]peer.ID, n)

135 136 137
	sanityAddrsMap := make(map[string]struct{})
	sanityPeersMap := make(map[string]struct{})

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138
	for i := 0; i < n; i++ {
Steven Allen's avatar
Steven Allen committed
139
		dhts[i] = setupDHT(ctx, t, false, options...)
140 141
		peers[i] = dhts[i].PeerID()
		addrs[i] = dhts[i].host.Addrs()[0]
142 143

		if _, lol := sanityAddrsMap[addrs[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
144
			t.Fatal("While setting up DHTs address got duplicated.")
145 146 147 148
		} else {
			sanityAddrsMap[addrs[i].String()] = struct{}{}
		}
		if _, lol := sanityPeersMap[peers[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
149
			t.Fatal("While setting up DHTs peerid got duplicated.")
150 151 152
		} else {
			sanityPeersMap[peers[i].String()] = struct{}{}
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153 154
	}

155
	return dhts
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156 157
}

158
func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
159 160
	t.Helper()

161
	idB := b.self
162
	addrB := b.peerstore.Addrs(idB)
163 164
	if len(addrB) == 0 {
		t.Fatal("peers setup incorrectly: no local address")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165
	}
166

167 168
	a.peerstore.AddAddrs(idB, addrB, peerstore.TempAddrTTL)
	pi := peer.AddrInfo{ID: idB}
Jeromy's avatar
Jeromy committed
169
	if err := a.host.Connect(ctx, pi); err != nil {
170
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171
	}
172 173
}

174 175
func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
	t.Helper()
176

177 178
	// loop until connection notification has been received.
	// under high load, this may not happen as immediately as we would like.
179
	for a.routingTable.Find(b.self) == "" {
180 181 182 183 184
		select {
		case <-ctx.Done():
			t.Fatal(ctx.Err())
		case <-time.After(time.Millisecond * 5):
		}
185
	}
186
}
187

188 189 190 191 192
func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
	t.Helper()
	connectNoSync(t, ctx, a, b)
	wait(t, ctx, a, b)
	wait(t, ctx, b, a)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193 194
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
195
func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
196
	ctx, cancel := context.WithCancel(ctx)
Steven Allen's avatar
Steven Allen committed
197 198
	defer cancel()

199
	logger.Debugf("refreshing DHTs routing tables...")
200 201 202 203 204 205 206 207 208

	// tried async. sequential fares much better. compare:
	// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
	// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
	// probably because results compound

	start := rand.Intn(len(dhts)) // randomize to decrease bias.
	for i := range dhts {
		dht := dhts[(start+i)%len(dhts)]
209 210 211 212 213 214 215 216
		select {
		case err := <-dht.RefreshRoutingTable():
			if err != nil {
				t.Error(err)
			}
		case <-ctx.Done():
			return
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
217 218 219
	}
}

220 221
// Check to make sure we always signal the RefreshRoutingTable channel.
func TestRefreshMultiple(t *testing.T) {
Adin Schmahmann's avatar
Adin Schmahmann committed
222 223
	// TODO: What's with this test? How long should it take and why does RefreshRoutingTable not take a context?
	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second)
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
	defer cancel()

	dhts := setupDHTS(t, ctx, 5)
	defer func() {
		for _, dht := range dhts {
			dht.Close()
			defer dht.host.Close()
		}
	}()

	for _, dht := range dhts[1:] {
		connect(t, ctx, dhts[0], dht)
	}

	a := dhts[0].RefreshRoutingTable()
	time.Sleep(time.Nanosecond)
	b := dhts[0].RefreshRoutingTable()
	time.Sleep(time.Nanosecond)
	c := dhts[0].RefreshRoutingTable()

	// make sure that all of these eventually return
	select {
	case <-a:
	case <-ctx.Done():
		t.Fatal("first channel didn't signal")
	}
	select {
	case <-b:
	case <-ctx.Done():
		t.Fatal("second channel didn't signal")
	}
	select {
	case <-c:
	case <-ctx.Done():
		t.Fatal("third channel didn't signal")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
262
func TestValueGetSet(t *testing.T) {
263 264
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
265

Steven Allen's avatar
Steven Allen committed
266
	var dhts [5]*IpfsDHT
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
267

Steven Allen's avatar
Steven Allen committed
268 269 270 271 272
	for i := range dhts {
		dhts[i] = setupDHT(ctx, t, false)
		defer dhts[i].Close()
		defer dhts[i].host.Close()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
273

Steven Allen's avatar
Steven Allen committed
274
	connect(t, ctx, dhts[0], dhts[1])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
275

Steven Allen's avatar
Steven Allen committed
276
	t.Log("adding value on: ", dhts[0].self)
Jeromy's avatar
Jeromy committed
277 278
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
Steven Allen's avatar
Steven Allen committed
279
	err := dhts[0].PutValue(ctxT, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
280 281 282 283
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
284
	t.Log("requesting value on dhts: ", dhts[1].self)
Adin Schmahmann's avatar
Adin Schmahmann committed
285
	ctxT, cancel = context.WithTimeout(ctx, time.Second*2*60)
Jeromy's avatar
Jeromy committed
286
	defer cancel()
Steven Allen's avatar
Steven Allen committed
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303

	val, err := dhts[1].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
	}

	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
	}

	// late connect

	connect(t, ctx, dhts[2], dhts[0])
	connect(t, ctx, dhts[2], dhts[1])

	t.Log("requesting value (offline) on dhts: ", dhts[2].self)
	vala, err := dhts[2].GetValue(ctxT, "/v/hello", Quorum(0))
Adin Schmahmann's avatar
Adin Schmahmann committed
304 305
	if err != nil {
		t.Fatal(err)
Steven Allen's avatar
Steven Allen committed
306 307
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
308 309 310
	if string(vala) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(vala))
	}
Steven Allen's avatar
Steven Allen committed
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
	t.Log("requesting value (online) on dhts: ", dhts[2].self)
	val, err = dhts[2].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
	}

	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
	}

	for _, d := range dhts[:3] {
		connect(t, ctx, dhts[3], d)
	}
	connect(t, ctx, dhts[4], dhts[3])

	t.Log("requesting value (requires peer routing) on dhts: ", dhts[4].self)
	val, err = dhts[4].GetValue(ctxT, "/v/hello")
328 329 330 331
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
332 333
	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
334
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
335 336
}

337 338 339 340 341 342 343 344 345 346 347 348
func TestValueSetInvalid(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

Will Scott's avatar
Will Scott committed
349
	dhtA.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{}
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
	dhtB.Validator.(record.NamespacedValidator)["v"] = blankValidator{}

	connect(t, ctx, dhtA, dhtB)

	testSetGet := func(val string, failset bool, exp string, experr error) {
		t.Helper()

		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
		err := dhtA.PutValue(ctxT, "/v/hello", []byte(val))
		if failset {
			if err == nil {
				t.Error("expected set to fail")
			}
		} else {
			if err != nil {
				t.Error(err)
			}
		}

		ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
		defer cancel()
		valb, err := dhtB.GetValue(ctxT, "/v/hello")
		if err != experr {
			t.Errorf("Set/Get %v: Expected %v error but got %v", val, experr, err)
		} else if err == nil && string(valb) != exp {
			t.Errorf("Expected '%v' got '%s'", exp, string(valb))
		}
	}

	// Expired records should not be set
	testSetGet("expired", true, "", routing.ErrNotFound)
	// Valid record should be returned
	testSetGet("valid", false, "valid", nil)
	// Newer record should supersede previous record
	testSetGet("newer", false, "newer", nil)
	// Attempt to set older record again should be ignored
	testSetGet("valid", true, "newer", nil)
}

Aarsh Shah's avatar
Aarsh Shah committed
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
func TestContextShutDown(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dht := setupDHT(ctx, t, false)

	// context is alive
	select {
	case <-dht.Context().Done():
		t.Fatal("context should not be done")
	default:
	}

	// shut down dht
	require.NoError(t, dht.Close())

	// now context should be done
	select {
	case <-dht.Context().Done():
	default:
		t.Fatal("context should be done")
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
414 415 416 417 418 419 420 421 422 423 424 425 426 427
func TestSearchValue(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

	connect(t, ctx, dhtA, dhtB)

Will Scott's avatar
Will Scott committed
428 429
	dhtA.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{}
	dhtB.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{}
Łukasz Magiera's avatar
Łukasz Magiera committed
430 431 432 433 434 435 436 437 438 439 440

	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()

	err := dhtA.PutValue(ctxT, "/v/hello", []byte("valid"))
	if err != nil {
		t.Error(err)
	}

	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
441
	valCh, err := dhtA.SearchValue(ctxT, "/v/hello", Quorum(0))
442 443 444
	if err != nil {
		t.Fatal(err)
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469

	select {
	case v := <-valCh:
		if string(v) != "valid" {
			t.Errorf("expected 'valid', got '%s'", string(v))
		}
	case <-ctxT.Done():
		t.Fatal(ctxT.Err())
	}

	err = dhtB.PutValue(ctxT, "/v/hello", []byte("newer"))
	if err != nil {
		t.Error(err)
	}

	select {
	case v := <-valCh:
		if string(v) != "newer" {
			t.Errorf("expected 'newer', got '%s'", string(v))
		}
	case <-ctxT.Done():
		t.Fatal(ctxT.Err())
	}
}

470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
func TestGetValues(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

	connect(t, ctx, dhtA, dhtB)

	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()

	err := dhtB.PutValue(ctxT, "/v/hello", []byte("newer"))
	if err != nil {
		t.Error(err)
	}

	err = dhtA.PutValue(ctxT, "/v/hello", []byte("valid"))
	if err != nil {
		t.Error(err)
	}

	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
	vals, err := dhtA.GetValues(ctxT, "/v/hello", 16)
	if err != nil {
		t.Fatal(err)
	}

	if len(vals) != 2 {
		t.Fatalf("expected to get 2 values, got %d", len(vals))
	}

508
	sort.Slice(vals, func(i, j int) bool { return string(vals[i].Val) < string(vals[j].Val) })
509 510 511 512 513 514 515 516 517

	if string(vals[0].Val) != "valid" {
		t.Errorf("unexpected vals[0]: %s", string(vals[0].Val))
	}
	if string(vals[1].Val) != "valid" {
		t.Errorf("unexpected vals[1]: %s", string(vals[1].Val))
	}
}

518 519 520 521 522 523 524 525 526 527 528 529 530
func TestValueGetInvalid(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

	dhtA.Validator.(record.NamespacedValidator)["v"] = blankValidator{}
Will Scott's avatar
Will Scott committed
531
	dhtB.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{}
532 533 534 535

	connect(t, ctx, dhtA, dhtB)

	testSetGet := func(val string, exp string, experr error) {
536 537
		t.Helper()

538 539 540 541
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
		err := dhtA.PutValue(ctxT, "/v/hello", []byte(val))
		if err != nil {
542
			t.Error(err)
543 544 545 546 547 548
		}

		ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
		defer cancel()
		valb, err := dhtB.GetValue(ctxT, "/v/hello")
		if err != experr {
Łukasz Magiera's avatar
Łukasz Magiera committed
549
			t.Errorf("Set/Get %v: Expected '%v' error but got '%v'", val, experr, err)
550 551
		} else if err == nil && string(valb) != exp {
			t.Errorf("Expected '%v' got '%s'", exp, string(valb))
552 553 554 555 556 557 558 559 560 561 562 563 564
		}
	}

	// Expired records should not be returned
	testSetGet("expired", "", routing.ErrNotFound)
	// Valid record should be returned
	testSetGet("valid", "valid", nil)
	// Newer record should supersede previous record
	testSetGet("newer", "newer", nil)
	// Attempt to set older record again should be ignored
	testSetGet("valid", "newer", nil)
}

565
func TestInvalidMessageSenderTracking(t *testing.T) {
Steven Allen's avatar
Steven Allen committed
566 567 568
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

569
	dht := setupDHT(ctx, t, false)
Steven Allen's avatar
Steven Allen committed
570 571
	defer dht.Close()

572
	foo := peer.ID("asdasd")
573
	_, err := dht.msgSender.messageSenderForPeer(ctx, foo)
574 575 576 577
	if err == nil {
		t.Fatal("that shouldnt have succeeded")
	}

578 579 580
	dht.msgSender.smlk.Lock()
	mscnt := len(dht.msgSender.strmap)
	dht.msgSender.smlk.Unlock()
Steven Allen's avatar
Steven Allen committed
581 582

	if mscnt > 0 {
583 584 585 586
		t.Fatal("should have no message senders in map")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
587 588
func TestProvides(t *testing.T) {
	// t.Skip("skipping test to debug another")
Steven Allen's avatar
Steven Allen committed
589 590
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
591

592
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
593 594
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
595
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
596
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
597 598 599
		}
	}()

600 601 602
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
603

604
	for _, k := range testCaseCids {
Matt Joiner's avatar
Matt Joiner committed
605
		logger.Debugf("announcing provider for %s", k)
Jeromy's avatar
Jeromy committed
606
		if err := dhts[3].Provide(ctx, k, true); err != nil {
607 608
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
609 610
	}

611 612 613 614
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)

	n := 0
615
	for _, c := range testCaseCids {
616 617
		n = (n + 1) % 3

Matt Joiner's avatar
Matt Joiner committed
618
		logger.Debugf("getting providers for %s from %d", c, n)
Jeromy's avatar
Jeromy committed
619 620
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
621
		provchan := dhts[n].FindProvidersAsync(ctxT, c, 1)
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636

		select {
		case prov := <-provchan:
			if prov.ID == "" {
				t.Fatal("Got back nil provider")
			}
			if prov.ID != dhts[3].self {
				t.Fatal("Got back wrong provider")
			}
		case <-ctxT.Done():
			t.Fatal("Did not get a provider back.")
		}
	}
}

Jeromy's avatar
Jeromy committed
637 638
func TestLocalProvides(t *testing.T) {
	// t.Skip("skipping test to debug another")
Steven Allen's avatar
Steven Allen committed
639 640
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Jeromy's avatar
Jeromy committed
641

642
	dhts := setupDHTS(t, ctx, 4)
Jeromy's avatar
Jeromy committed
643 644 645 646 647 648 649 650 651 652 653 654
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])

	for _, k := range testCaseCids {
Matt Joiner's avatar
Matt Joiner committed
655
		logger.Debugf("announcing provider for %s", k)
Jeromy's avatar
Jeromy committed
656 657 658 659 660 661 662 663 664
		if err := dhts[3].Provide(ctx, k, false); err != nil {
			t.Fatal(err)
		}
	}

	time.Sleep(time.Millisecond * 10)

	for _, c := range testCaseCids {
		for i := 0; i < 3; i++ {
665
			provs := dhts[i].ProviderManager.GetProviders(ctx, c.Hash())
Jeromy's avatar
Jeromy committed
666 667 668 669 670 671 672
			if len(provs) > 0 {
				t.Fatal("shouldnt know this")
			}
		}
	}
}

673
// if minPeers or avgPeers is 0, dont test for it.
Steven Allen's avatar
Steven Allen committed
674
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) {
675
	// test "well-formed-ness" (>= minPeers peers in every routing table)
Steven Allen's avatar
Steven Allen committed
676
	t.Helper()
677 678 679 680 681 682 683

	checkTables := func() bool {
		totalPeers := 0
		for _, dht := range dhts {
			rtlen := dht.routingTable.Size()
			totalPeers += rtlen
			if minPeers > 0 && rtlen < minPeers {
684
				//t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700
				return false
			}
		}
		actualAvgPeers := totalPeers / len(dhts)
		t.Logf("avg rt size: %d", actualAvgPeers)
		if avgPeers > 0 && actualAvgPeers < avgPeers {
			t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers)
			return false
		}
		return true
	}

	timeoutA := time.After(timeout)
	for {
		select {
		case <-timeoutA:
Steven Allen's avatar
Steven Allen committed
701 702
			t.Errorf("failed to reach well-formed routing tables after %s", timeout)
			return
703 704
		case <-time.After(5 * time.Millisecond):
			if checkTables() {
Steven Allen's avatar
Steven Allen committed
705 706
				// succeeded
				return
707 708 709 710 711 712 713
			}
		}
	}
}

func printRoutingTables(dhts []*IpfsDHT) {
	// the routing tables should be full now. let's inspect them.
714
	fmt.Printf("checking routing table of %d\n", len(dhts))
715 716 717 718 719 720 721
	for _, dht := range dhts {
		fmt.Printf("checking routing table of %s\n", dht.self)
		dht.routingTable.Print()
		fmt.Println("")
	}
}

722
func TestRefresh(t *testing.T) {
723 724 725 726
	if testing.Short() {
		t.SkipNow()
	}

Steven Allen's avatar
Steven Allen committed
727 728
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
729

730
	nDHTs := 30
731
	dhts := setupDHTS(t, ctx, nDHTs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
732 733 734
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
735
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
736 737 738 739 740 741 742 743
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

744
	<-time.After(100 * time.Millisecond)
745
	// bootstrap a few times until we get good tables.
Steven Allen's avatar
Steven Allen committed
746 747 748 749
	t.Logf("bootstrapping them so they find each other %d", nDHTs)
	ctxT, cancelT := context.WithTimeout(ctx, 5*time.Second)
	defer cancelT()

Aarsh Shah's avatar
Aarsh Shah committed
750 751 752 753 754 755 756 757 758
	for ctxT.Err() == nil {
		bootstrap(t, ctxT, dhts)

		// wait a bit.
		select {
		case <-time.After(50 * time.Millisecond):
			continue // being explicit
		case <-ctxT.Done():
			return
759
		}
Aarsh Shah's avatar
Aarsh Shah committed
760
	}
761

Aarsh Shah's avatar
Aarsh Shah committed
762
	waitForWellFormedTables(t, dhts, 7, 10, 10*time.Second)
Steven Allen's avatar
Steven Allen committed
763

Steven Allen's avatar
Steven Allen committed
764
	cancelT()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
765

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
766 767
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
768 769 770 771
		printRoutingTables(dhts)
	}
}

772
func TestRefreshBelowMinRTThreshold(t *testing.T) {
Aarsh Shah's avatar
Aarsh Shah committed
773
	ctx := context.Background()
774 775 776 777 778

	// enable auto bootstrap on A
	dhtA, err := New(
		ctx,
		bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
Adin Schmahmann's avatar
Adin Schmahmann committed
779
		testPrefix,
780 781
		Mode(ModeServer),
		NamespacedValidator("v", blankValidator{}),
782 783 784 785 786
	)
	if err != nil {
		t.Fatal(err)
	}

Aarsh Shah's avatar
Aarsh Shah committed
787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804
	dhtB := setupDHT(ctx, t, false)
	dhtC := setupDHT(ctx, t, false)

	defer func() {
		dhtA.Close()
		dhtA.host.Close()

		dhtB.Close()
		dhtB.host.Close()

		dhtC.Close()
		dhtC.host.Close()
	}()

	connect(t, ctx, dhtA, dhtB)
	connect(t, ctx, dhtB, dhtC)

	// we ONLY init bootstrap on A
805
	dhtA.RefreshRoutingTable()
Aarsh Shah's avatar
Aarsh Shah committed
806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832
	// and wait for one round to complete i.e. A should be connected to both B & C
	waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 2, 2, 20*time.Second)

	// now we create two new peers
	dhtD := setupDHT(ctx, t, false)
	dhtE := setupDHT(ctx, t, false)

	// connect them to each other
	connect(t, ctx, dhtD, dhtE)
	defer func() {
		dhtD.Close()
		dhtD.host.Close()

		dhtE.Close()
		dhtE.host.Close()
	}()

	// and then, on connecting the peer D to A, the min RT threshold gets triggered on A which leads to a bootstrap.
	// since the default bootstrap scan interval is 30 mins - 1 hour, we can be sure that if bootstrap happens,
	// it is because of the min RT threshold getting triggered (since default min value is 4 & we only have 2 peers in the RT when D gets connected)
	connect(t, ctx, dhtA, dhtD)

	// and because of the above bootstrap, A also discovers E !
	waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second)
	assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!")
}

833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861
func TestQueryWithEmptyRTShouldNotPanic(t *testing.T) {
	ctx := context.Background()
	d := setupDHT(ctx, t, false)

	// TODO This swallows the error for now, should we change it ?
	// FindProviders
	ps, _ := d.FindProviders(ctx, testCaseCids[0])
	require.Empty(t, ps)

	// GetClosestPeers
	pc, err := d.GetClosestPeers(ctx, "key")
	require.Nil(t, pc)
	require.Equal(t, kb.ErrLookupFailure, err)

	// GetValue
	best, err := d.GetValue(ctx, "key")
	require.Empty(t, best)
	require.Error(t, err)

	// SearchValue
	bchan, err := d.SearchValue(ctx, "key")
	require.Empty(t, bchan)
	require.NoError(t, err)

	// Provide
	err = d.Provide(ctx, testCaseCids[0], true)
	require.Equal(t, kb.ErrLookupFailure, err)
}

862
func TestPeriodicRefresh(t *testing.T) {
863 864 865
	if testing.Short() {
		t.SkipNow()
	}
866 867 868
	if detectrace.WithRace() {
		t.Skip("skipping due to race detector max goroutines")
	}
869

Steven Allen's avatar
Steven Allen committed
870 871
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
872 873

	nDHTs := 30
874
	dhts := setupDHTS(t, ctx, nDHTs)
875 876 877 878 879 880 881
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

882
	t.Logf("dhts are not connected. %d", nDHTs)
883 884 885 886 887 888 889 890 891 892 893
	for _, dht := range dhts {
		rtlen := dht.routingTable.Size()
		if rtlen > 0 {
			t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen)
		}
	}

	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

894
	t.Logf("DHTs are now connected to 1-2 others. %d", nDHTs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
895
	for _, dht := range dhts {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
896
		rtlen := dht.routingTable.Size()
897 898
		if rtlen > 2 {
			t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
899
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
900
	}
901

902 903 904 905
	if u.Debug {
		printRoutingTables(dhts)
	}

906
	t.Logf("bootstrapping them so they find each other. %d", nDHTs)
Aarsh Shah's avatar
Aarsh Shah committed
907
	var wg sync.WaitGroup
Matt Joiner's avatar
Matt Joiner committed
908
	for _, dht := range dhts {
Aarsh Shah's avatar
Aarsh Shah committed
909 910 911 912 913
		wg.Add(1)
		go func(d *IpfsDHT) {
			<-d.RefreshRoutingTable()
			wg.Done()
		}(dht)
914
	}
915

Aarsh Shah's avatar
Aarsh Shah committed
916
	wg.Wait()
917 918
	// this is async, and we dont know when it's finished with one cycle, so keep checking
	// until the routing tables look better, or some long timeout for the failure case.
919
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
920 921 922

	if u.Debug {
		printRoutingTables(dhts)
923
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
924 925
}

926
func TestProvidesMany(t *testing.T) {
927 928 929
	if detectrace.WithRace() {
		t.Skip("skipping due to race detector max goroutines")
	}
Steven Allen's avatar
Steven Allen committed
930 931
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
932 933

	nDHTs := 40
934
	dhts := setupDHTS(t, ctx, nDHTs)
935 936 937
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
938
			defer dhts[i].host.Close()
939 940 941 942 943 944 945 946
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

947
	<-time.After(100 * time.Millisecond)
948
	t.Logf("bootstrapping them so they find each other. %d", nDHTs)
Jeromy's avatar
Jeromy committed
949 950
	ctxT, cancel := context.WithTimeout(ctx, 20*time.Second)
	defer cancel()
951 952
	bootstrap(t, ctxT, dhts)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
953 954 955 956 957 958 959 960
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
		t.Logf("checking routing table of %d", nDHTs)
		for _, dht := range dhts {
			fmt.Printf("checking routing table of %s\n", dht.self)
			dht.routingTable.Print()
			fmt.Println("")
		}
961
	}
962

963
	providers := make(map[cid.Cid]peer.ID)
964

965
	d := 0
966
	for _, c := range testCaseCids {
967 968
		d = (d + 1) % len(dhts)
		dht := dhts[d]
969
		providers[c] = dht.self
970

971
		t.Logf("announcing provider for %s", c)
Jeromy's avatar
Jeromy committed
972
		if err := dht.Provide(ctx, c, true); err != nil {
973 974
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
975 976
	}

977 978
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
979

980 981
	errchan := make(chan error)

Jeromy's avatar
Jeromy committed
982 983
	ctxT, cancel = context.WithTimeout(ctx, 5*time.Second)
	defer cancel()
984 985

	var wg sync.WaitGroup
986
	getProvider := func(dht *IpfsDHT, k cid.Cid) {
987
		defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
988

989
		expected := providers[k]
990

991 992 993
		provchan := dht.FindProvidersAsync(ctxT, k, 1)
		select {
		case prov := <-provchan:
994 995
			actual := prov.ID
			if actual == "" {
996
				errchan <- fmt.Errorf("Got back nil provider (%s at %s)", k, dht.self)
997 998 999
			} else if actual != expected {
				errchan <- fmt.Errorf("Got back wrong provider (%s != %s) (%s at %s)",
					expected, actual, k, dht.self)
1000 1001 1002
			}
		case <-ctxT.Done():
			errchan <- fmt.Errorf("Did not get a provider back (%s at %s)", k, dht.self)
Jeromy's avatar
Jeromy committed
1003
		}
1004 1005
	}

1006
	for _, c := range testCaseCids {
1007 1008
		// everyone should be able to find it...
		for _, dht := range dhts {
Matt Joiner's avatar
Matt Joiner committed
1009
			logger.Debugf("getting providers for %s at %s", c, dht.self)
1010
			wg.Add(1)
1011
			go getProvider(dht, c)
1012
		}
1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
	}

	// we need this because of printing errors
	go func() {
		wg.Wait()
		close(errchan)
	}()

	for err := range errchan {
		t.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1023 1024 1025
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1026
func TestProvidesAsync(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1027
	// t.Skip("skipping test to debug another")
1028 1029 1030
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1031

Steven Allen's avatar
Steven Allen committed
1032 1033
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1034

1035
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1036 1037
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1038
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1039
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1040 1041 1042
		}
	}()

1043 1044 1045
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1046

Jeromy's avatar
Jeromy committed
1047
	err := dhts[3].Provide(ctx, testCaseCids[0], true)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1048 1049 1050 1051 1052 1053
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(time.Millisecond * 60)

Jeromy's avatar
Jeromy committed
1054 1055
	ctxT, cancel := context.WithTimeout(ctx, time.Millisecond*300)
	defer cancel()
1056
	provs := dhts[0].FindProvidersAsync(ctxT, testCaseCids[0], 5)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1057
	select {
Jeromy's avatar
Jeromy committed
1058 1059 1060 1061
	case p, ok := <-provs:
		if !ok {
			t.Fatal("Provider channel was closed...")
		}
1062
		if p.ID == "" {
Jeromy's avatar
Jeromy committed
1063 1064
			t.Fatal("Got back nil provider!")
		}
1065
		if p.ID != dhts[3].self {
1066
			t.Fatalf("got a provider, but not the right one. %s", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1067
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1068
	case <-ctxT.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1069 1070 1071 1072
		t.Fatal("Didnt get back providers")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1073
func TestLayeredGet(t *testing.T) {
1074 1075
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
1076

1077
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1078 1079
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1080
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1081
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1082 1083 1084
		}
	}()

1085 1086
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
1087
	connect(t, ctx, dhts[2], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1088

1089
	err := dhts[3].PutValue(ctx, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1090 1091 1092 1093
	if err != nil {
		t.Fatal(err)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1094
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1095

Jeromy's avatar
Jeromy committed
1096 1097
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
1098 1099 1100
	val, err := dhts[0].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1101
	}
1102 1103 1104

	if string(val) != "world" {
		t.Error("got wrong value")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1105 1106 1107
	}
}

1108 1109 1110 1111 1112 1113 1114 1115
func TestUnfindablePeer(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

1116
	dhts := setupDHTS(t, ctx, 4)
1117 1118 1119
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
1120
			dhts[i].Host().Close()
1121 1122 1123 1124 1125 1126 1127 1128
		}
	}()

	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[2], dhts[3])

	// Give DHT 1 a bad addr for DHT 2.
1129 1130
	dhts[1].host.Peerstore().ClearAddrs(dhts[2].PeerID())
	dhts[1].host.Peerstore().AddAddr(dhts[2].PeerID(), dhts[0].Host().Addrs()[0], time.Minute)
1131 1132 1133

	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
1134
	_, err := dhts[0].FindPeer(ctxT, dhts[3].PeerID())
1135 1136 1137 1138 1139 1140 1141 1142
	if err == nil {
		t.Error("should have failed to find peer")
	}
	if ctxT.Err() != nil {
		t.Error("FindPeer should have failed before context expired")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1143
func TestFindPeer(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1144
	// t.Skip("skipping test to debug another")
1145 1146 1147
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1148

Steven Allen's avatar
Steven Allen committed
1149 1150
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1151

1152
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1153 1154
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1155
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1156
			dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1157 1158 1159
		}
	}()

1160 1161 1162
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1163

Jeromy's avatar
Jeromy committed
1164 1165
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
1166
	p, err := dhts[0].FindPeer(ctxT, dhts[2].PeerID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1167 1168 1169 1170
	if err != nil {
		t.Fatal(err)
	}

1171
	if p.ID == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1172 1173 1174
		t.Fatal("Failed to find peer.")
	}

1175
	if p.ID != dhts[2].PeerID() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1176 1177 1178
		t.Fatal("Didnt find expected peer.")
	}
}
1179 1180

func TestConnectCollision(t *testing.T) {
1181 1182 1183
	if testing.Short() {
		t.SkipNow()
	}
1184

1185
	runTimes := 10
1186

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1187
	for rtime := 0; rtime < runTimes; rtime++ {
Matt Joiner's avatar
Matt Joiner committed
1188
		logger.Info("Running Time: ", rtime)
1189

Steven Allen's avatar
Steven Allen committed
1190
		ctx, cancel := context.WithCancel(context.Background())
1191

1192 1193
		dhtA := setupDHT(ctx, t, false)
		dhtB := setupDHT(ctx, t, false)
1194

1195 1196
		addrA := dhtA.peerstore.Addrs(dhtA.self)[0]
		addrB := dhtB.peerstore.Addrs(dhtB.self)[0]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1197

1198 1199
		peerA := dhtA.self
		peerB := dhtB.self
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1200

1201
		errs := make(chan error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1202
		go func() {
1203 1204
			dhtA.peerstore.AddAddr(peerB, addrB, peerstore.TempAddrTTL)
			pi := peer.AddrInfo{ID: peerB}
Jeromy's avatar
Jeromy committed
1205
			err := dhtA.host.Connect(ctx, pi)
1206
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1207 1208
		}()
		go func() {
1209 1210
			dhtB.peerstore.AddAddr(peerA, addrA, peerstore.TempAddrTTL)
			pi := peer.AddrInfo{ID: peerA}
Jeromy's avatar
Jeromy committed
1211
			err := dhtB.host.Connect(ctx, pi)
1212
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1213 1214
		}()

1215
		timeout := time.After(5 * time.Second)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1216
		select {
1217 1218 1219 1220
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1221 1222 1223 1224
		case <-timeout:
			t.Fatal("Timeout received!")
		}
		select {
1225 1226 1227 1228
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1229 1230 1231 1232
		case <-timeout:
			t.Fatal("Timeout received!")
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1233 1234
		dhtA.Close()
		dhtB.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1235 1236
		dhtA.host.Close()
		dhtB.host.Close()
Steven Allen's avatar
Steven Allen committed
1237
		cancel()
Jeromy's avatar
Jeromy committed
1238
	}
1239
}
1240 1241 1242 1243 1244

func TestBadProtoMessages(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

1245
	d := setupDHT(ctx, t, false)
1246 1247 1248 1249 1250 1251

	nilrec := new(pb.Message)
	if _, err := d.handlePutValue(ctx, "testpeer", nilrec); err == nil {
		t.Fatal("should have errored on nil record")
	}
}
1252

1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281
func TestAtomicPut(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	d := setupDHT(ctx, t, false)
	d.Validator = testAtomicPutValidator{}

	// fnc to put a record
	key := "testkey"
	putRecord := func(value []byte) error {
		rec := record.MakePutRecord(key, value)
		pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
		pmes.Record = rec
		_, err := d.handlePutValue(ctx, "testpeer", pmes)
		return err
	}

	// put a valid record
	if err := putRecord([]byte("valid")); err != nil {
		t.Fatal("should not have errored on a valid record")
	}

	// simultaneous puts for old & new values
	values := [][]byte{[]byte("newer1"), []byte("newer7"), []byte("newer3"), []byte("newer5")}
	var wg sync.WaitGroup
	for _, v := range values {
		wg.Add(1)
		go func(v []byte) {
			defer wg.Done()
Steven Allen's avatar
Steven Allen committed
1282
			_ = putRecord(v) // we expect some of these to fail
1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297
		}(v)
	}
	wg.Wait()

	// get should return the newest value
	pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
	msg, err := d.handleGetValue(ctx, "testkey", pmes)
	if err != nil {
		t.Fatalf("should not have errored on final get, but got %+v", err)
	}
	if string(msg.GetRecord().Value) != "newer7" {
		t.Fatalf("Expected 'newer7' got '%s'", string(msg.GetRecord().Value))
	}
}

1298 1299 1300 1301 1302 1303 1304 1305 1306
func TestClientModeConnect(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	a := setupDHT(ctx, t, false)
	b := setupDHT(ctx, t, true)

	connectNoSync(t, ctx, a, b)

1307
	c := testCaseCids[0]
1308
	p := peer.ID("TestPeer")
1309
	a.ProviderManager.AddProvider(ctx, c.Hash(), p)
1310
	time.Sleep(time.Millisecond * 5) // just in case...
1311

1312
	provs, err := b.FindProviders(ctx, c)
1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323
	if err != nil {
		t.Fatal(err)
	}

	if len(provs) == 0 {
		t.Fatal("Expected to get a provider back")
	}

	if provs[0].ID != p {
		t.Fatal("expected it to be our test peer")
	}
Steven Allen's avatar
Steven Allen committed
1324 1325 1326 1327 1328 1329 1330 1331 1332
	if a.routingTable.Find(b.self) != "" {
		t.Fatal("DHT clients should not be added to routing tables")
	}
	if b.routingTable.Find(a.self) == "" {
		t.Fatal("DHT server should have been added to the dht client's routing table")
	}
}

func TestClientModeFindPeer(t *testing.T) {
1333
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Steven Allen's avatar
Steven Allen committed
1334 1335 1336 1337 1338 1339
	defer cancel()

	a := setupDHT(ctx, t, false)
	b := setupDHT(ctx, t, true)
	c := setupDHT(ctx, t, true)

1340 1341
	connectNoSync(t, ctx, b, a)
	connectNoSync(t, ctx, c, a)
Steven Allen's avatar
Steven Allen committed
1342 1343

	// Can't use `connect` because b and c are only clients.
1344 1345
	wait(t, ctx, b, a)
	wait(t, ctx, c, a)
Steven Allen's avatar
Steven Allen committed
1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358

	pi, err := c.FindPeer(ctx, b.self)
	if err != nil {
		t.Fatal(err)
	}
	if len(pi.Addrs) == 0 {
		t.Fatal("should have found addresses for node b")
	}

	err = c.host.Connect(ctx, pi)
	if err != nil {
		t.Fatal(err)
	}
1359
}
1360

Matt Joiner's avatar
Matt Joiner committed
1361 1362 1363 1364
func minInt(a, b int) int {
	if a < b {
		return a
	}
Alan Shaw's avatar
Alan Shaw committed
1365
	return b
Matt Joiner's avatar
Matt Joiner committed
1366 1367 1368
}

func TestFindPeerQueryMinimal(t *testing.T) {
Steven Allen's avatar
Steven Allen committed
1369
	testFindPeerQuery(t, 2, 22, 1)
Matt Joiner's avatar
Matt Joiner committed
1370 1371
}

1372
func TestFindPeerQuery(t *testing.T) {
1373 1374 1375 1376
	if detectrace.WithRace() {
		t.Skip("skipping due to race detector max goroutines")
	}

Matt Joiner's avatar
Matt Joiner committed
1377 1378 1379
	if testing.Short() {
		t.Skip("skipping test in short mode")
	}
Steven Allen's avatar
Steven Allen committed
1380
	testFindPeerQuery(t, 5, 40, 3)
Matt Joiner's avatar
Matt Joiner committed
1381 1382
}

Aarsh Shah's avatar
Aarsh Shah committed
1383
// NOTE: You must have ATLEAST (minRTRefreshThreshold+1) test peers before using this.
Matt Joiner's avatar
Matt Joiner committed
1384 1385 1386
func testFindPeerQuery(t *testing.T,
	bootstrappers, // Number of nodes connected to the querying node
	leafs, // Number of nodes that might be connected to from the bootstrappers
Steven Allen's avatar
Steven Allen committed
1387
	bootstrapConns int, // Number of bootstrappers each leaf should connect to.
Matt Joiner's avatar
Matt Joiner committed
1388
) {
1389 1390 1391
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

Steven Allen's avatar
Steven Allen committed
1392
	dhts := setupDHTS(t, ctx, 1+bootstrappers+leafs, BucketSize(4))
1393
	defer func() {
Matt Joiner's avatar
Matt Joiner committed
1394 1395 1396
		for _, d := range dhts {
			d.Close()
			d.host.Close()
1397 1398 1399
		}
	}()

Steven Allen's avatar
Steven Allen committed
1400 1401
	t.Log("connecting")

Jeromy's avatar
Jeromy committed
1402
	mrand := rand.New(rand.NewSource(42))
1403 1404
	guy := dhts[0]
	others := dhts[1:]
Steven Allen's avatar
Steven Allen committed
1405 1406 1407
	for i := 0; i < leafs; i++ {
		for _, v := range mrand.Perm(bootstrappers)[:bootstrapConns] {
			connectNoSync(t, ctx, others[v], others[bootstrappers+i])
1408 1409 1410
		}
	}

Matt Joiner's avatar
Matt Joiner committed
1411
	for i := 0; i < bootstrappers; i++ {
Aarsh Shah's avatar
Aarsh Shah committed
1412
		connectNoSync(t, ctx, guy, others[i])
1413 1414
	}

Steven Allen's avatar
Steven Allen committed
1415 1416
	t.Log("waiting for routing tables")

Aarsh Shah's avatar
Aarsh Shah committed
1417
	// give some time for things to settle down
Steven Allen's avatar
Steven Allen committed
1418
	waitForWellFormedTables(t, dhts, bootstrapConns, bootstrapConns, 5*time.Second)
Aarsh Shah's avatar
Aarsh Shah committed
1419

Steven Allen's avatar
Steven Allen committed
1420 1421 1422 1423 1424 1425 1426 1427 1428
	t.Log("refreshing")

	var wg sync.WaitGroup
	for _, dht := range dhts {
		wg.Add(1)
		go func(d *IpfsDHT) {
			<-d.RefreshRoutingTable()
			wg.Done()
		}(dht)
1429 1430
	}

Steven Allen's avatar
Steven Allen committed
1431 1432 1433 1434 1435 1436 1437 1438 1439 1440
	wg.Wait()

	t.Log("waiting for routing tables again")

	// Wait for refresh to work. At least one bucket should be full.
	waitForWellFormedTables(t, dhts, 4, 0, 5*time.Second)

	var peers []peer.ID
	for _, d := range others {
		peers = append(peers, d.PeerID())
Matt Joiner's avatar
Matt Joiner committed
1441
	}
Steven Allen's avatar
Steven Allen committed
1442 1443

	t.Log("querying")
Matt Joiner's avatar
Matt Joiner committed
1444

1445 1446 1447 1448
	val := "foobar"
	rtval := kb.ConvertKey(val)

	out, err := guy.GetClosestPeers(ctx, val)
Matt Joiner's avatar
Matt Joiner committed
1449
	require.NoError(t, err)
1450 1451 1452 1453 1454 1455

	var outpeers []peer.ID
	for p := range out {
		outpeers = append(outpeers, p)
	}

Jeromy's avatar
Jeromy committed
1456
	sort.Sort(peer.IDSlice(outpeers))
Steven Allen's avatar
Steven Allen committed
1457

Steven Allen's avatar
Steven Allen committed
1458
	exp := kb.SortClosestPeers(peers, rtval)[:minInt(guy.bucketSize, len(peers))]
Matt Joiner's avatar
Matt Joiner committed
1459
	t.Logf("got %d peers", len(outpeers))
1460
	got := kb.SortClosestPeers(outpeers, rtval)
Jeromy's avatar
Jeromy committed
1461

Matt Joiner's avatar
Matt Joiner committed
1462
	assert.EqualValues(t, exp, got)
1463 1464
}

1465 1466 1467 1468 1469
func TestFindClosestPeers(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	nDHTs := 30
1470
	dhts := setupDHTS(t, ctx, nDHTs)
1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

1483 1484
	querier := dhts[1]
	peers, err := querier.GetClosestPeers(ctx, "foo")
1485 1486 1487 1488 1489 1490 1491 1492 1493
	if err != nil {
		t.Fatal(err)
	}

	var out []peer.ID
	for p := range peers {
		out = append(out, p)
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
1494 1495
	if len(out) < querier.beta {
		t.Fatalf("got wrong number of peers (got %d, expected at least %d)", len(out), querier.beta)
1496 1497
	}
}
1498

1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535
func TestFixLowPeers(t *testing.T) {
	ctx := context.Background()

	dhts := setupDHTS(t, ctx, minRTRefreshThreshold+5)

	defer func() {
		for _, d := range dhts {
			d.Close()
			d.Host().Close()
		}
	}()

	mainD := dhts[0]

	// connect it to everyone else
	for _, d := range dhts[1:] {
		mainD.peerstore.AddAddrs(d.self, d.peerstore.Addrs(d.self), peerstore.TempAddrTTL)
		require.NoError(t, mainD.Host().Connect(ctx, peer.AddrInfo{ID: d.self}))
	}

	waitForWellFormedTables(t, []*IpfsDHT{mainD}, minRTRefreshThreshold, minRTRefreshThreshold+4, 5*time.Second)

	// run a refresh on all of them
	for _, d := range dhts {
		err := <-d.RefreshRoutingTable()
		require.NoError(t, err)
	}

	// now remove peers from RT so threshold gets hit
	for _, d := range dhts[3:] {
		mainD.routingTable.RemovePeer(d.self)
	}

	// but we will still get enough peers in the RT because of fix low Peers
	waitForWellFormedTables(t, []*IpfsDHT{mainD}, minRTRefreshThreshold, minRTRefreshThreshold, 5*time.Second)
}

1536 1537
func TestProvideDisabled(t *testing.T) {
	k := testCaseCids[0]
1538
	kHash := k.Hash()
1539 1540 1541 1542 1543 1544 1545
	for i := 0; i < 3; i++ {
		enabledA := (i & 0x1) > 0
		enabledB := (i & 0x2) > 0
		t.Run(fmt.Sprintf("a=%v/b=%v", enabledA, enabledB), func(t *testing.T) {
			ctx, cancel := context.WithCancel(context.Background())
			defer cancel()

1546
			var (
1547
				optsA, optsB []Option
1548
			)
Adin Schmahmann's avatar
Adin Schmahmann committed
1549 1550 1551
			optsA = append(optsA, ProtocolPrefix("/provMaybeDisabled"))
			optsB = append(optsB, ProtocolPrefix("/provMaybeDisabled"))

1552
			if !enabledA {
1553
				optsA = append(optsA, DisableProviders())
1554 1555
			}
			if !enabledB {
1556
				optsB = append(optsB, DisableProviders())
1557 1558 1559 1560
			}

			dhtA := setupDHT(ctx, t, false, optsA...)
			dhtB := setupDHT(ctx, t, false, optsB...)
1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581

			defer dhtA.Close()
			defer dhtB.Close()
			defer dhtA.host.Close()
			defer dhtB.host.Close()

			connect(t, ctx, dhtA, dhtB)

			err := dhtB.Provide(ctx, k, true)
			if enabledB {
				if err != nil {
					t.Fatal("put should have succeeded on node B", err)
				}
			} else {
				if err != routing.ErrNotSupported {
					t.Fatal("should not have put the value to node B", err)
				}
				_, err = dhtB.FindProviders(ctx, k)
				if err != routing.ErrNotSupported {
					t.Fatal("get should have failed on node B")
				}
1582
				provs := dhtB.ProviderManager.GetProviders(ctx, kHash)
1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597
				if len(provs) != 0 {
					t.Fatal("node B should not have found local providers")
				}
			}

			provs, err := dhtA.FindProviders(ctx, k)
			if enabledA {
				if len(provs) != 0 {
					t.Fatal("node A should not have found providers")
				}
			} else {
				if err != routing.ErrNotSupported {
					t.Fatal("node A should not have found providers")
				}
			}
1598
			provAddrs := dhtA.ProviderManager.GetProviders(ctx, kHash)
1599 1600 1601 1602 1603 1604 1605
			if len(provAddrs) != 0 {
				t.Fatal("node A should not have found local providers")
			}
		})
	}
}

1606 1607
func TestHandleRemotePeerProtocolChanges(t *testing.T) {
	ctx := context.Background()
1608
	os := []Option{
Adin Schmahmann's avatar
Adin Schmahmann committed
1609
		testPrefix,
1610 1611 1612
		Mode(ModeServer),
		NamespacedValidator("v", blankValidator{}),
		DisableAutoRefresh(),
1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627
	}

	// start host 1 that speaks dht v1
	dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
	require.NoError(t, err)
	defer dhtA.Close()

	// start host 2 that also speaks dht v1
	dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
	require.NoError(t, err)
	defer dhtB.Close()

	connect(t, ctx, dhtA, dhtB)

	// now assert both have each other in their RT
Steven Allen's avatar
Steven Allen committed
1628
	waitForWellFormedTables(t, []*IpfsDHT{dhtA, dhtB}, 1, 1, 10*time.Second)
1629 1630 1631 1632 1633

	// dhtB becomes a client
	require.NoError(t, dhtB.setMode(modeClient))

	// which means that dhtA should evict it from it's RT
Steven Allen's avatar
Steven Allen committed
1634
	waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 0, 0, 10*time.Second)
1635 1636 1637 1638

	// dhtB becomes a server
	require.NoError(t, dhtB.setMode(modeServer))

Aarsh Shah's avatar
Aarsh Shah committed
1639
	// which means dhtA should have it in the RT again because of fixLowPeers
Steven Allen's avatar
Steven Allen committed
1640
	waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 1, 1, 10*time.Second)
1641 1642
}

1643
func TestGetSetPluggedProtocol(t *testing.T) {
1644 1645 1646
	t.Run("PutValue/GetValue - same protocol", func(t *testing.T) {
		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()
1647

1648
		os := []Option{
Adin Schmahmann's avatar
Adin Schmahmann committed
1649
			ProtocolPrefix("/esh"),
1650 1651 1652
			Mode(ModeServer),
			NamespacedValidator("v", blankValidator{}),
			DisableAutoRefresh(),
1653
		}
1654

Steven Allen's avatar
Steven Allen committed
1655
		dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
1656 1657 1658
		if err != nil {
			t.Fatal(err)
		}
1659

Steven Allen's avatar
Steven Allen committed
1660
		dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
1661 1662 1663
		if err != nil {
			t.Fatal(err)
		}
1664

1665
		connect(t, ctx, dhtA, dhtB)
1666

1667
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
1668
		defer cancel()
1669 1670 1671
		if err := dhtA.PutValue(ctxT, "/v/cat", []byte("meow")); err != nil {
			t.Fatal(err)
		}
1672

1673 1674 1675 1676
		value, err := dhtB.GetValue(ctxT, "/v/cat")
		if err != nil {
			t.Fatal(err)
		}
1677

1678 1679 1680 1681 1682
		if string(value) != "meow" {
			t.Fatalf("Expected 'meow' got '%s'", string(value))
		}
	})

1683 1684
	t.Run("DHT routing table for peer A won't contain B if A and B don't use same protocol", func(t *testing.T) {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1685 1686
		defer cancel()

1687
		dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []Option{
Adin Schmahmann's avatar
Adin Schmahmann committed
1688
			ProtocolPrefix("/esh"),
1689 1690 1691
			Mode(ModeServer),
			NamespacedValidator("v", blankValidator{}),
			DisableAutoRefresh(),
1692
		}...)
1693 1694 1695 1696
		if err != nil {
			t.Fatal(err)
		}

1697
		dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []Option{
Adin Schmahmann's avatar
Adin Schmahmann committed
1698
			ProtocolPrefix("/lsr"),
1699 1700 1701
			Mode(ModeServer),
			NamespacedValidator("v", blankValidator{}),
			DisableAutoRefresh(),
1702
		}...)
1703 1704 1705 1706
		if err != nil {
			t.Fatal(err)
		}

1707
		connectNoSync(t, ctx, dhtA, dhtB)
1708

1709 1710 1711 1712 1713 1714
		// We don't expect connection notifications for A to reach B (or vice-versa), given
		// that they've been configured with different protocols - but we'll give them a
		// chance, anyhow.
		time.Sleep(time.Second * 2)

		err = dhtA.PutValue(ctx, "/v/cat", []byte("meow"))
1715 1716
		if err == nil || !strings.Contains(err.Error(), "failed to find any peer in table") {
			t.Fatalf("put should not have been able to find any peers in routing table, err:'%v'", err)
1717 1718
		}

1719 1720 1721
		v, err := dhtB.GetValue(ctx, "/v/cat")
		if v != nil || err != routing.ErrNotFound {
			t.Fatalf("get should have failed from not being able to find the value, err: '%v'", err)
1722
		}
1723
	})
1724
}
1725 1726 1727 1728 1729

func TestPing(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	ds := setupDHTS(t, ctx, 2)
1730
	ds[0].Host().Peerstore().AddAddrs(ds[1].PeerID(), ds[1].Host().Addrs(), peerstore.AddressTTL)
1731 1732 1733 1734 1735 1736 1737 1738
	assert.NoError(t, ds[0].Ping(context.Background(), ds[1].PeerID()))
}

func TestClientModeAtInit(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	pinger := setupDHT(ctx, t, false)
	client := setupDHT(ctx, t, true)
1739
	pinger.Host().Peerstore().AddAddrs(client.PeerID(), client.Host().Addrs(), peerstore.AddressTTL)
1740
	err := pinger.Ping(context.Background(), client.PeerID())
Steven Allen's avatar
Steven Allen committed
1741
	assert.True(t, errors.Is(err, multistream.ErrNotSupported))
1742
}
1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766

func TestModeChange(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	clientOnly := setupDHT(ctx, t, true)
	clientToServer := setupDHT(ctx, t, true)
	clientOnly.Host().Peerstore().AddAddrs(clientToServer.PeerID(), clientToServer.Host().Addrs(), peerstore.AddressTTL)
	err := clientOnly.Ping(ctx, clientToServer.PeerID())
	assert.True(t, errors.Is(err, multistream.ErrNotSupported))
	err = clientToServer.setMode(modeServer)
	assert.Nil(t, err)
	err = clientOnly.Ping(ctx, clientToServer.PeerID())
	assert.Nil(t, err)
	err = clientToServer.setMode(modeClient)
	assert.Nil(t, err)
	err = clientOnly.Ping(ctx, clientToServer.PeerID())
	assert.NotNil(t, err)
}

func TestDynamicModeSwitching(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

1767 1768
	prober := setupDHT(ctx, t, true)               // our test harness
	node := setupDHT(ctx, t, true, Mode(ModeAuto)) // the node under test
1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789
	prober.Host().Peerstore().AddAddrs(node.PeerID(), node.Host().Addrs(), peerstore.AddressTTL)
	if _, err := prober.Host().Network().DialPeer(ctx, node.PeerID()); err != nil {
		t.Fatal(err)
	}

	emitter, err := node.host.EventBus().Emitter(new(event.EvtLocalReachabilityChanged))
	if err != nil {
		t.Fatal(err)
	}

	assertDHTClient := func() {
		err = prober.Ping(ctx, node.PeerID())
		assert.True(t, errors.Is(err, multistream.ErrNotSupported))
		if l := len(prober.RoutingTable().ListPeers()); l != 0 {
			t.Errorf("expected routing table length to be 0; instead is %d", l)
		}
	}

	assertDHTServer := func() {
		err = prober.Ping(ctx, node.PeerID())
		assert.Nil(t, err)
Aarsh Shah's avatar
Aarsh Shah committed
1790 1791
		// the node should be in the RT for the prober
		// because the prober will call fixLowPeers when the node updates it's protocols
1792 1793 1794 1795 1796
		if l := len(prober.RoutingTable().ListPeers()); l != 1 {
			t.Errorf("expected routing table length to be 1; instead is %d", l)
		}
	}

Steven Allen's avatar
Steven Allen committed
1797 1798 1799 1800
	err = emitter.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate})
	if err != nil {
		t.Fatal(err)
	}
1801 1802 1803 1804
	time.Sleep(500 * time.Millisecond)

	assertDHTClient()

Steven Allen's avatar
Steven Allen committed
1805 1806 1807 1808
	err = emitter.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPublic})
	if err != nil {
		t.Fatal(err)
	}
1809 1810 1811 1812
	time.Sleep(500 * time.Millisecond)

	assertDHTServer()

Steven Allen's avatar
Steven Allen committed
1813 1814 1815 1816
	err = emitter.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityUnknown})
	if err != nil {
		t.Fatal(err)
	}
1817 1818 1819 1820
	time.Sleep(500 * time.Millisecond)

	assertDHTClient()
}
Adin Schmahmann's avatar
Adin Schmahmann committed
1821

Steven Allen's avatar
Steven Allen committed
1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868
func TestInvalidKeys(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	nDHTs := 2
	dhts := setupDHTS(t, ctx, nDHTs)
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

	querier := dhts[0]
	_, err := querier.GetClosestPeers(ctx, "")
	if err == nil {
		t.Fatal("get closest peers should have failed")
	}

	_, err = querier.FindProviders(ctx, cid.Cid{})
	switch err {
	case routing.ErrNotFound, routing.ErrNotSupported, kb.ErrLookupFailure:
		t.Fatal("failed with the wrong error: ", err)
	case nil:
		t.Fatal("find providers should have failed")
	}

	_, err = querier.FindPeer(ctx, peer.ID(""))
	if err != peer.ErrEmptyPeerID {
		t.Fatal("expected to fail due to the empty peer ID")
	}

	_, err = querier.GetValue(ctx, "")
	if err == nil {
		t.Fatal("expected to have failed")
	}

	err = querier.PutValue(ctx, "", []byte("foobar"))
	if err == nil {
		t.Fatal("expected to have failed")
	}
}
1869

1870 1871 1872 1873
func TestV1ProtocolOverride(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

Adin Schmahmann's avatar
Adin Schmahmann committed
1874 1875
	d1 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto"))
	d2 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto"))
1876 1877 1878
	d3 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto2"))
	d4 := setupDHT(ctx, t, false)

Adin Schmahmann's avatar
Adin Schmahmann committed
1879
	dhts := []*IpfsDHT{d1, d2, d3, d4}
1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895

	for i, dout := range dhts {
		for _, din := range dhts[i+1:] {
			connectNoSync(t, ctx, dout, din)
		}
	}

	wait(t, ctx, d1, d2)
	wait(t, ctx, d2, d1)

	time.Sleep(time.Second)

	if d1.RoutingTable().Size() != 1 || d2.routingTable.Size() != 1 {
		t.Fatal("should have one peer in the routing table")
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
1896
	if d3.RoutingTable().Size() > 0 || d4.RoutingTable().Size() > 0 {
1897 1898 1899 1900
		t.Fatal("should have an empty routing table")
	}
}

1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923
func TestRoutingFilter(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	nDHTs := 2
	dhts := setupDHTS(t, ctx, nDHTs)
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()
	dhts[0].routingTablePeerFilter = PublicRoutingTableFilter

	connectNoSync(t, ctx, dhts[0], dhts[1])
	wait(t, ctx, dhts[1], dhts[0])

	select {
	case <-ctx.Done():
		t.Fatal(ctx.Err())
	case <-time.After(time.Millisecond * 200):
	}
}
1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950

func TestBootStrapWhenRTIsEmpty(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// create three boostrap peers each of which is connected to 1 other peer.
	nBootStraps := 3
	bootstrappers := setupDHTS(t, ctx, nBootStraps)
	defer func() {
		for i := 0; i < nBootStraps; i++ {
			bootstrappers[i].Close()
			defer bootstrappers[i].host.Close()
		}
	}()

	bootstrapcons := setupDHTS(t, ctx, nBootStraps)
	defer func() {
		for i := 0; i < nBootStraps; i++ {
			bootstrapcons[i].Close()
			defer bootstrapcons[i].host.Close()
		}
	}()
	for i := 0; i < nBootStraps; i++ {
		connect(t, ctx, bootstrappers[i], bootstrapcons[i])
	}

	// convert the bootstrap addresses to a p2p address
1951
	bootstrapAddrs := make([]peer.AddrInfo, nBootStraps)
1952
	for i := 0; i < nBootStraps; i++ {
1953 1954 1955
		b := peer.AddrInfo{ID: bootstrappers[i].self,
			Addrs: bootstrappers[i].host.Addrs()}
		bootstrapAddrs[i] = b
1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025
	}

	//----------------
	// We will initialize a DHT with 1 bootstrapper, connect it to another DHT,
	// then remove the latter from the Routing Table
	// This should add the bootstrap peer and the peer that the bootstrap peer is conencted to
	// to it's Routing Table.
	// AutoRefresh needs to be enabled for this.
	dht1, err := New(
		ctx,
		bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
		testPrefix,
		NamespacedValidator("v", blankValidator{}),
		Mode(ModeServer),
		BootstrapPeers(bootstrapAddrs[0]),
	)
	require.NoError(t, err)
	dht2 := setupDHT(ctx, t, false)
	defer func() {
		dht1.host.Close()
		dht2.host.Close()
		dht1.Close()
		dht2.Close()
	}()
	connect(t, ctx, dht1, dht2)
	require.NoError(t, dht2.Close())
	require.NoError(t, dht2.host.Close())
	require.NoError(t, dht1.host.Network().ClosePeer(dht2.self))
	dht1.routingTable.RemovePeer(dht2.self)
	require.NotContains(t, dht2.self, dht1.routingTable.ListPeers())
	require.Eventually(t, func() bool {
		return dht1.routingTable.Size() == 2 && dht1.routingTable.Find(bootstrappers[0].self) != "" &&
			dht1.routingTable.Find(bootstrapcons[0].self) != ""
	}, 5*time.Second, 500*time.Millisecond)

	//----------------
	// We will initialize a DHT with 2 bootstrappers, connect it to another DHT,
	// then remove the DHT handler from the other DHT which should make the first DHT's
	// routing table empty.
	// This should add the bootstrap peers and the peer thats the bootstrap peers are connected to
	// to it's Routing Table.
	// AutoRefresh needs to be enabled for this.
	dht1, err = New(
		ctx,
		bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
		testPrefix,
		NamespacedValidator("v", blankValidator{}),
		Mode(ModeServer),
		BootstrapPeers(bootstrapAddrs[1], bootstrapAddrs[2]),
	)
	require.NoError(t, err)

	dht2 = setupDHT(ctx, t, false)
	connect(t, ctx, dht1, dht2)
	defer func() {
		dht1.host.Close()
		dht2.host.Close()
		dht1.Close()
		dht2.Close()
	}()
	connect(t, ctx, dht1, dht2)
	require.NoError(t, dht2.setMode(modeClient))

	require.Eventually(t, func() bool {
		rt := dht1.routingTable

		return rt.Size() == 4 && rt.Find(bootstrappers[1].self) != "" &&
			rt.Find(bootstrappers[2].self) != "" && rt.Find(bootstrapcons[1].self) != "" && rt.Find(bootstrapcons[2].self) != ""
	}, 5*time.Second, 500*time.Millisecond)
}
2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095

func TestPreconnectedNodes(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	// If this test fails it may hang so set a timeout
	ctx, cancel = context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	opts := []Option{
		testPrefix,
		DisableAutoRefresh(),
		Mode(ModeServer),
	}

	// Create hosts
	h1 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
	h2 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))

	// Setup first DHT
	d1, err := New(ctx, h1, opts...)
	if err != nil {
		t.Fatal(err)
	}

	// Connect the first host to the second
	if err := h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
		t.Fatal(err)
	}

	// Wait until we know identify has completed by checking for supported protocols
	// TODO: Is this needed? Could we do h2.Connect(h1) and that would wait for identify to complete.
	for {
		h1Protos, err := h2.Peerstore().SupportsProtocols(h1.ID(), d1.protocolsStrs...)
		if err != nil {
			t.Fatal(err)
		}

		if len(h1Protos) > 0 {
			break
		}

		select {
		case <-time.After(time.Millisecond * 100):
		case <-ctx.Done():
			t.Fatal("test hung")
		}
	}

	// Setup the second DHT
	d2, err := New(ctx, h2, opts...)
	if err != nil {
		t.Fatal(err)
	}

	// See if it works
	peerCh, err := d2.GetClosestPeers(ctx, "testkey")
	if err != nil {
		t.Fatal(err)
	}

	select {
	case p := <-peerCh:
		if p == h1.ID() {
			break
		}
		t.Fatal("could not find peer")
	case <-ctx.Done():
		t.Fatal("test hung")
	}
}