dht_test.go 32.4 KB
Newer Older
1 2
package dht

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3
import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"errors"
7
	"fmt"
8
	"math/rand"
9
	"sort"
10
	"strings"
11
	"sync"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
	"testing"
13
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14

15 16 17 18
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"

19 20 21 22 23 24 25
	multistream "github.com/multiformats/go-multistream"

	"golang.org/x/xerrors"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"

26 27
	opts "github.com/libp2p/go-libp2p-kad-dht/opts"
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
28

29
	cid "github.com/ipfs/go-cid"
30
	u "github.com/ipfs/go-ipfs-util"
31
	kb "github.com/libp2p/go-libp2p-kbucket"
George Antoniadis's avatar
George Antoniadis committed
32
	record "github.com/libp2p/go-libp2p-record"
Steven Allen's avatar
Steven Allen committed
33
	swarmt "github.com/libp2p/go-libp2p-swarm/testing"
34 35
	ci "github.com/libp2p/go-libp2p-testing/ci"
	travisci "github.com/libp2p/go-libp2p-testing/ci/travis"
Jeromy's avatar
Jeromy committed
36
	bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
37
	ma "github.com/multiformats/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38 39
)

40
var testCaseCids []cid.Cid
41 42 43 44

func init() {
	for i := 0; i < 100; i++ {
		v := fmt.Sprintf("%d -- value", i)
45 46 47

		mhv := u.Hash([]byte(v))
		testCaseCids = append(testCaseCids, cid.NewCidV0(mhv))
48 49 50
	}
}

51 52 53 54 55 56 57 58 59 60
type blankValidator struct{}

func (blankValidator) Validate(_ string, _ []byte) error        { return nil }
func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

type testValidator struct{}

func (testValidator) Select(_ string, bs [][]byte) (int, error) {
	index := -1
	for i, b := range bs {
61
		if bytes.Equal(b, []byte("newer")) {
62
			index = i
63
		} else if bytes.Equal(b, []byte("valid")) {
64 65 66 67 68 69 70 71 72 73 74
			if index == -1 {
				index = i
			}
		}
	}
	if index == -1 {
		return -1, errors.New("no rec found")
	}
	return index, nil
}
func (testValidator) Validate(_ string, b []byte) error {
75
	if bytes.Equal(b, []byte("expired")) {
76 77 78 79 80
		return errors.New("expired")
	}
	return nil
}

81
func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
82 83
	d, err := New(
		ctx,
Steven Allen's avatar
Steven Allen committed
84
		bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
85 86 87 88 89
		opts.Client(client),
		opts.NamespacedValidator("v", blankValidator{}),
	)
	if err != nil {
		t.Fatal(err)
90
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
91 92 93
	return d
}

94
func setupDHTS(t *testing.T, ctx context.Context, n int) []*IpfsDHT {
95
	addrs := make([]ma.Multiaddr, n)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
96
	dhts := make([]*IpfsDHT, n)
97 98
	peers := make([]peer.ID, n)

99 100 101
	sanityAddrsMap := make(map[string]struct{})
	sanityPeersMap := make(map[string]struct{})

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102
	for i := 0; i < n; i++ {
103
		dhts[i] = setupDHT(ctx, t, false)
104 105
		peers[i] = dhts[i].PeerID()
		addrs[i] = dhts[i].host.Addrs()[0]
106 107

		if _, lol := sanityAddrsMap[addrs[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
108
			t.Fatal("While setting up DHTs address got duplicated.")
109 110 111 112
		} else {
			sanityAddrsMap[addrs[i].String()] = struct{}{}
		}
		if _, lol := sanityPeersMap[peers[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
113
			t.Fatal("While setting up DHTs peerid got duplicated.")
114 115 116
		} else {
			sanityPeersMap[peers[i].String()] = struct{}{}
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117 118
	}

119
	return dhts
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120 121
}

122
func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
123 124
	t.Helper()

125
	idB := b.self
126
	addrB := b.peerstore.Addrs(idB)
127 128
	if len(addrB) == 0 {
		t.Fatal("peers setup incorrectly: no local address")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129
	}
130

131 132
	a.peerstore.AddAddrs(idB, addrB, peerstore.TempAddrTTL)
	pi := peer.AddrInfo{ID: idB}
Jeromy's avatar
Jeromy committed
133
	if err := a.host.Connect(ctx, pi); err != nil {
134
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135
	}
136 137
}

138 139
func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
	t.Helper()
140

141 142
	// loop until connection notification has been received.
	// under high load, this may not happen as immediately as we would like.
143
	for a.routingTable.Find(b.self) == "" {
144 145 146 147 148
		select {
		case <-ctx.Done():
			t.Fatal(ctx.Err())
		case <-time.After(time.Millisecond * 5):
		}
149
	}
150
}
151

152 153 154 155 156
func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
	t.Helper()
	connectNoSync(t, ctx, a, b)
	wait(t, ctx, a, b)
	wait(t, ctx, b, a)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157 158
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
160

161
	ctx, cancel := context.WithCancel(ctx)
Steven Allen's avatar
Steven Allen committed
162 163
	defer cancel()

Matt Joiner's avatar
Matt Joiner committed
164
	logger.Debugf("Bootstrapping DHTs...")
165 166 167 168 169 170

	// tried async. sequential fares much better. compare:
	// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
	// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
	// probably because results compound

171
	cfg := DefaultBootstrapConfig
172 173
	cfg.Queries = 3

174 175 176
	start := rand.Intn(len(dhts)) // randomize to decrease bias.
	for i := range dhts {
		dht := dhts[(start+i)%len(dhts)]
177
		dht.runBootstrap(ctx, cfg)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178 179 180
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181
func TestValueGetSet(t *testing.T) {
182 183
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
184

Steven Allen's avatar
Steven Allen committed
185
	var dhts [5]*IpfsDHT
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186

Steven Allen's avatar
Steven Allen committed
187 188 189 190 191
	for i := range dhts {
		dhts[i] = setupDHT(ctx, t, false)
		defer dhts[i].Close()
		defer dhts[i].host.Close()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192

Steven Allen's avatar
Steven Allen committed
193
	connect(t, ctx, dhts[0], dhts[1])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
194

Steven Allen's avatar
Steven Allen committed
195
	t.Log("adding value on: ", dhts[0].self)
Jeromy's avatar
Jeromy committed
196 197
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
Steven Allen's avatar
Steven Allen committed
198
	err := dhts[0].PutValue(ctxT, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
199 200 201 202
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
203
	t.Log("requesting value on dhts: ", dhts[1].self)
Jeromy's avatar
Jeromy committed
204 205
	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
Steven Allen's avatar
Steven Allen committed
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246

	val, err := dhts[1].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
	}

	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
	}

	// late connect

	connect(t, ctx, dhts[2], dhts[0])
	connect(t, ctx, dhts[2], dhts[1])

	t.Log("requesting value (offline) on dhts: ", dhts[2].self)
	vala, err := dhts[2].GetValue(ctxT, "/v/hello", Quorum(0))
	if vala != nil {
		t.Fatalf("offline get should have failed, got %s", string(vala))
	}
	if err != routing.ErrNotFound {
		t.Fatalf("offline get should have failed with ErrNotFound, got: %s", err)
	}

	t.Log("requesting value (online) on dhts: ", dhts[2].self)
	val, err = dhts[2].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
	}

	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
	}

	for _, d := range dhts[:3] {
		connect(t, ctx, dhts[3], d)
	}
	connect(t, ctx, dhts[4], dhts[3])

	t.Log("requesting value (requires peer routing) on dhts: ", dhts[4].self)
	val, err = dhts[4].GetValue(ctxT, "/v/hello")
247 248 249 250
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
251 252
	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
253
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
254 255
}

256 257 258 259 260 261 262 263 264 265 266 267
func TestValueSetInvalid(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

268
	dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
	dhtB.Validator.(record.NamespacedValidator)["v"] = blankValidator{}

	connect(t, ctx, dhtA, dhtB)

	testSetGet := func(val string, failset bool, exp string, experr error) {
		t.Helper()

		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
		err := dhtA.PutValue(ctxT, "/v/hello", []byte(val))
		if failset {
			if err == nil {
				t.Error("expected set to fail")
			}
		} else {
			if err != nil {
				t.Error(err)
			}
		}

		ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
		defer cancel()
		valb, err := dhtB.GetValue(ctxT, "/v/hello")
		if err != experr {
			t.Errorf("Set/Get %v: Expected %v error but got %v", val, experr, err)
		} else if err == nil && string(valb) != exp {
			t.Errorf("Expected '%v' got '%s'", exp, string(valb))
		}
	}

	// Expired records should not be set
	testSetGet("expired", true, "", routing.ErrNotFound)
	// Valid record should be returned
	testSetGet("valid", false, "valid", nil)
	// Newer record should supersede previous record
	testSetGet("newer", false, "newer", nil)
	// Attempt to set older record again should be ignored
	testSetGet("valid", true, "newer", nil)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
func TestSearchValue(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

	connect(t, ctx, dhtA, dhtB)

	dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
	dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}

	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()

	err := dhtA.PutValue(ctxT, "/v/hello", []byte("valid"))
	if err != nil {
		t.Error(err)
	}

	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
336
	valCh, err := dhtA.SearchValue(ctxT, "/v/hello", Quorum(-1))
337 338 339
	if err != nil {
		t.Fatal(err)
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364

	select {
	case v := <-valCh:
		if string(v) != "valid" {
			t.Errorf("expected 'valid', got '%s'", string(v))
		}
	case <-ctxT.Done():
		t.Fatal(ctxT.Err())
	}

	err = dhtB.PutValue(ctxT, "/v/hello", []byte("newer"))
	if err != nil {
		t.Error(err)
	}

	select {
	case v := <-valCh:
		if string(v) != "newer" {
			t.Errorf("expected 'newer', got '%s'", string(v))
		}
	case <-ctxT.Done():
		t.Fatal(ctxT.Err())
	}
}

365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
func TestGetValues(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

	connect(t, ctx, dhtA, dhtB)

	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()

	err := dhtB.PutValue(ctxT, "/v/hello", []byte("newer"))
	if err != nil {
		t.Error(err)
	}

	err = dhtA.PutValue(ctxT, "/v/hello", []byte("valid"))
	if err != nil {
		t.Error(err)
	}

	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
	vals, err := dhtA.GetValues(ctxT, "/v/hello", 16)
	if err != nil {
		t.Fatal(err)
	}

	if len(vals) != 2 {
		t.Fatalf("expected to get 2 values, got %d", len(vals))
	}

403
	sort.Slice(vals, func(i, j int) bool { return string(vals[i].Val) < string(vals[j].Val) })
404 405 406 407 408 409 410 411 412

	if string(vals[0].Val) != "valid" {
		t.Errorf("unexpected vals[0]: %s", string(vals[0].Val))
	}
	if string(vals[1].Val) != "valid" {
		t.Errorf("unexpected vals[1]: %s", string(vals[1].Val))
	}
}

413 414 415 416 417 418 419 420 421 422 423 424 425
func TestValueGetInvalid(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)

	defer dhtA.Close()
	defer dhtB.Close()
	defer dhtA.host.Close()
	defer dhtB.host.Close()

	dhtA.Validator.(record.NamespacedValidator)["v"] = blankValidator{}
426
	dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}
427 428 429 430

	connect(t, ctx, dhtA, dhtB)

	testSetGet := func(val string, exp string, experr error) {
431 432
		t.Helper()

433 434 435 436
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
		err := dhtA.PutValue(ctxT, "/v/hello", []byte(val))
		if err != nil {
437
			t.Error(err)
438 439 440 441 442 443
		}

		ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
		defer cancel()
		valb, err := dhtB.GetValue(ctxT, "/v/hello")
		if err != experr {
Łukasz Magiera's avatar
Łukasz Magiera committed
444
			t.Errorf("Set/Get %v: Expected '%v' error but got '%v'", val, experr, err)
445 446
		} else if err == nil && string(valb) != exp {
			t.Errorf("Expected '%v' got '%s'", exp, string(valb))
447 448 449 450 451 452 453 454 455 456 457 458 459
		}
	}

	// Expired records should not be returned
	testSetGet("expired", "", routing.ErrNotFound)
	// Valid record should be returned
	testSetGet("valid", "valid", nil)
	// Newer record should supersede previous record
	testSetGet("newer", "newer", nil)
	// Attempt to set older record again should be ignored
	testSetGet("valid", "newer", nil)
}

460
func TestInvalidMessageSenderTracking(t *testing.T) {
Steven Allen's avatar
Steven Allen committed
461 462 463
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

464
	dht := setupDHT(ctx, t, false)
Steven Allen's avatar
Steven Allen committed
465 466
	defer dht.Close()

467
	foo := peer.ID("asdasd")
Steven Allen's avatar
Steven Allen committed
468
	_, err := dht.messageSenderForPeer(ctx, foo)
469 470 471 472 473
	if err == nil {
		t.Fatal("that shouldnt have succeeded")
	}

	dht.smlk.Lock()
Steven Allen's avatar
Steven Allen committed
474 475 476 477
	mscnt := len(dht.strmap)
	dht.smlk.Unlock()

	if mscnt > 0 {
478 479 480 481
		t.Fatal("should have no message senders in map")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
482 483
func TestProvides(t *testing.T) {
	// t.Skip("skipping test to debug another")
Steven Allen's avatar
Steven Allen committed
484 485
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
486

487
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
488 489
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
490
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
491
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
492 493 494
		}
	}()

495 496 497
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
498

499
	for _, k := range testCaseCids {
Matt Joiner's avatar
Matt Joiner committed
500
		logger.Debugf("announcing provider for %s", k)
Jeromy's avatar
Jeromy committed
501
		if err := dhts[3].Provide(ctx, k, true); err != nil {
502 503
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
504 505
	}

506 507 508 509
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)

	n := 0
510
	for _, c := range testCaseCids {
511 512
		n = (n + 1) % 3

Matt Joiner's avatar
Matt Joiner committed
513
		logger.Debugf("getting providers for %s from %d", c, n)
Jeromy's avatar
Jeromy committed
514 515
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
516
		provchan := dhts[n].FindProvidersAsync(ctxT, c, 1)
517 518 519 520 521 522 523 524 525 526 527 528 529 530 531

		select {
		case prov := <-provchan:
			if prov.ID == "" {
				t.Fatal("Got back nil provider")
			}
			if prov.ID != dhts[3].self {
				t.Fatal("Got back wrong provider")
			}
		case <-ctxT.Done():
			t.Fatal("Did not get a provider back.")
		}
	}
}

Jeromy's avatar
Jeromy committed
532 533
func TestLocalProvides(t *testing.T) {
	// t.Skip("skipping test to debug another")
Steven Allen's avatar
Steven Allen committed
534 535
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Jeromy's avatar
Jeromy committed
536

537
	dhts := setupDHTS(t, ctx, 4)
Jeromy's avatar
Jeromy committed
538 539 540 541 542 543 544 545 546 547 548 549
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])

	for _, k := range testCaseCids {
Matt Joiner's avatar
Matt Joiner committed
550
		logger.Debugf("announcing provider for %s", k)
Jeromy's avatar
Jeromy committed
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
		if err := dhts[3].Provide(ctx, k, false); err != nil {
			t.Fatal(err)
		}
	}

	time.Sleep(time.Millisecond * 10)

	for _, c := range testCaseCids {
		for i := 0; i < 3; i++ {
			provs := dhts[i].providers.GetProviders(ctx, c)
			if len(provs) > 0 {
				t.Fatal("shouldnt know this")
			}
		}
	}
}

568 569 570 571 572 573 574 575 576 577
// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
	// test "well-formed-ness" (>= minPeers peers in every routing table)

	checkTables := func() bool {
		totalPeers := 0
		for _, dht := range dhts {
			rtlen := dht.routingTable.Size()
			totalPeers += rtlen
			if minPeers > 0 && rtlen < minPeers {
578
				//t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594
				return false
			}
		}
		actualAvgPeers := totalPeers / len(dhts)
		t.Logf("avg rt size: %d", actualAvgPeers)
		if avgPeers > 0 && actualAvgPeers < avgPeers {
			t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers)
			return false
		}
		return true
	}

	timeoutA := time.After(timeout)
	for {
		select {
		case <-timeoutA:
Matt Joiner's avatar
Matt Joiner committed
595
			logger.Debugf("did not reach well-formed routing tables by %s", timeout)
596 597 598 599 600 601 602 603 604 605 606
			return false // failed
		case <-time.After(5 * time.Millisecond):
			if checkTables() {
				return true // succeeded
			}
		}
	}
}

func printRoutingTables(dhts []*IpfsDHT) {
	// the routing tables should be full now. let's inspect them.
607
	fmt.Printf("checking routing table of %d\n", len(dhts))
608 609 610 611 612 613 614
	for _, dht := range dhts {
		fmt.Printf("checking routing table of %s\n", dht.self)
		dht.routingTable.Print()
		fmt.Println("")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
615
func TestBootstrap(t *testing.T) {
616 617 618 619
	if testing.Short() {
		t.SkipNow()
	}

Steven Allen's avatar
Steven Allen committed
620 621
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
622

623
	nDHTs := 30
624
	dhts := setupDHTS(t, ctx, nDHTs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
625 626 627
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
628
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
629 630 631 632 633 634 635 636
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

637
	<-time.After(100 * time.Millisecond)
638 639 640 641
	// bootstrap a few times until we get good tables.
	stop := make(chan struct{})
	go func() {
		for {
642
			t.Logf("bootstrapping them so they find each other %d", nDHTs)
Jeromy's avatar
Jeromy committed
643 644
			ctxT, cancel := context.WithTimeout(ctx, 5*time.Second)
			defer cancel()
645 646 647 648 649 650 651 652 653 654 655
			bootstrap(t, ctxT, dhts)

			select {
			case <-time.After(50 * time.Millisecond):
				continue // being explicit
			case <-stop:
				return
			}
		}
	}()

656
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
657
	close(stop)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
658

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
659 660
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
661 662 663 664 665
		printRoutingTables(dhts)
	}
}

func TestPeriodicBootstrap(t *testing.T) {
666 667 668
	if ci.IsRunning() {
		t.Skip("skipping on CI. highly timing dependent")
	}
669 670 671 672
	if testing.Short() {
		t.SkipNow()
	}

Steven Allen's avatar
Steven Allen committed
673 674
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
675 676

	nDHTs := 30
677
	dhts := setupDHTS(t, ctx, nDHTs)
678 679 680 681 682 683 684
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

685
	cfg := DefaultBootstrapConfig
686 687
	cfg.Queries = 5

688
	t.Logf("dhts are not connected. %d", nDHTs)
689 690 691 692 693 694 695 696 697 698 699
	for _, dht := range dhts {
		rtlen := dht.routingTable.Size()
		if rtlen > 0 {
			t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen)
		}
	}

	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

700
	t.Logf("DHTs are now connected to 1-2 others. %d", nDHTs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
701
	for _, dht := range dhts {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
702
		rtlen := dht.routingTable.Size()
703 704
		if rtlen > 2 {
			t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
705
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
706
	}
707

708 709 710 711
	if u.Debug {
		printRoutingTables(dhts)
	}

712
	t.Logf("bootstrapping them so they find each other. %d", nDHTs)
Matt Joiner's avatar
Matt Joiner committed
713 714
	for _, dht := range dhts {
		go dht.BootstrapOnce(ctx, cfg)
715
	}
716 717 718

	// this is async, and we dont know when it's finished with one cycle, so keep checking
	// until the routing tables look better, or some long timeout for the failure case.
719
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
720 721 722

	if u.Debug {
		printRoutingTables(dhts)
723
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
724 725
}

726 727
func TestProvidesMany(t *testing.T) {
	t.Skip("this test doesn't work")
Steven Allen's avatar
Steven Allen committed
728 729
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
730 731

	nDHTs := 40
732
	dhts := setupDHTS(t, ctx, nDHTs)
733 734 735
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
736
			defer dhts[i].host.Close()
737 738 739 740 741 742 743 744
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

745
	<-time.After(100 * time.Millisecond)
746
	t.Logf("bootstrapping them so they find each other. %d", nDHTs)
Jeromy's avatar
Jeromy committed
747 748
	ctxT, cancel := context.WithTimeout(ctx, 20*time.Second)
	defer cancel()
749 750
	bootstrap(t, ctxT, dhts)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
751 752 753 754 755 756 757 758
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
		t.Logf("checking routing table of %d", nDHTs)
		for _, dht := range dhts {
			fmt.Printf("checking routing table of %s\n", dht.self)
			dht.routingTable.Print()
			fmt.Println("")
		}
759
	}
760

761
	providers := make(map[cid.Cid]peer.ID)
762

763
	d := 0
764
	for _, c := range testCaseCids {
765 766
		d = (d + 1) % len(dhts)
		dht := dhts[d]
767
		providers[c] = dht.self
768

769
		t.Logf("announcing provider for %s", c)
Jeromy's avatar
Jeromy committed
770
		if err := dht.Provide(ctx, c, true); err != nil {
771 772
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
773 774
	}

775 776
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
777

778 779
	errchan := make(chan error)

Jeromy's avatar
Jeromy committed
780 781
	ctxT, cancel = context.WithTimeout(ctx, 5*time.Second)
	defer cancel()
782 783

	var wg sync.WaitGroup
784
	getProvider := func(dht *IpfsDHT, k cid.Cid) {
785
		defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
786

787
		expected := providers[k]
788

789 790 791
		provchan := dht.FindProvidersAsync(ctxT, k, 1)
		select {
		case prov := <-provchan:
792 793
			actual := prov.ID
			if actual == "" {
794
				errchan <- fmt.Errorf("Got back nil provider (%s at %s)", k, dht.self)
795 796 797
			} else if actual != expected {
				errchan <- fmt.Errorf("Got back wrong provider (%s != %s) (%s at %s)",
					expected, actual, k, dht.self)
798 799 800
			}
		case <-ctxT.Done():
			errchan <- fmt.Errorf("Did not get a provider back (%s at %s)", k, dht.self)
Jeromy's avatar
Jeromy committed
801
		}
802 803
	}

804
	for _, c := range testCaseCids {
805 806
		// everyone should be able to find it...
		for _, dht := range dhts {
Matt Joiner's avatar
Matt Joiner committed
807
			logger.Debugf("getting providers for %s at %s", c, dht.self)
808
			wg.Add(1)
809
			go getProvider(dht, c)
810
		}
811 812 813 814 815 816 817 818 819 820
	}

	// we need this because of printing errors
	go func() {
		wg.Wait()
		close(errchan)
	}()

	for err := range errchan {
		t.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
821 822 823
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
824
func TestProvidesAsync(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
825
	// t.Skip("skipping test to debug another")
826 827 828
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
829

Steven Allen's avatar
Steven Allen committed
830 831
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
832

833
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
834 835
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
836
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
837
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
838 839 840
		}
	}()

841 842 843
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
844

Jeromy's avatar
Jeromy committed
845
	err := dhts[3].Provide(ctx, testCaseCids[0], true)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
846 847 848 849 850 851
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(time.Millisecond * 60)

Jeromy's avatar
Jeromy committed
852 853
	ctxT, cancel := context.WithTimeout(ctx, time.Millisecond*300)
	defer cancel()
854
	provs := dhts[0].FindProvidersAsync(ctxT, testCaseCids[0], 5)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
855
	select {
Jeromy's avatar
Jeromy committed
856 857 858 859
	case p, ok := <-provs:
		if !ok {
			t.Fatal("Provider channel was closed...")
		}
860
		if p.ID == "" {
Jeromy's avatar
Jeromy committed
861 862
			t.Fatal("Got back nil provider!")
		}
863
		if p.ID != dhts[3].self {
864
			t.Fatalf("got a provider, but not the right one. %s", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
865
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
866
	case <-ctxT.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
867 868 869 870
		t.Fatal("Didnt get back providers")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
871
func TestLayeredGet(t *testing.T) {
872 873
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
874

875
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
876 877
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
878
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
879
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
880 881 882
		}
	}()

883 884
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
885
	connect(t, ctx, dhts[2], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
886

887
	err := dhts[3].PutValue(ctx, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
888 889 890 891
	if err != nil {
		t.Fatal(err)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
892
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
893

Jeromy's avatar
Jeromy committed
894 895
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
896 897 898
	val, err := dhts[0].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
899
	}
900 901 902

	if string(val) != "world" {
		t.Error("got wrong value")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
903 904 905
	}
}

906 907 908 909 910 911 912 913
func TestUnfindablePeer(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

914
	dhts := setupDHTS(t, ctx, 4)
915 916 917
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
918
			dhts[i].Host().Close()
919 920 921 922 923 924 925 926
		}
	}()

	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[2], dhts[3])

	// Give DHT 1 a bad addr for DHT 2.
927 928
	dhts[1].host.Peerstore().ClearAddrs(dhts[2].PeerID())
	dhts[1].host.Peerstore().AddAddr(dhts[2].PeerID(), dhts[0].Host().Addrs()[0], time.Minute)
929 930 931

	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
932
	_, err := dhts[0].FindPeer(ctxT, dhts[3].PeerID())
933 934 935 936 937 938 939 940
	if err == nil {
		t.Error("should have failed to find peer")
	}
	if ctxT.Err() != nil {
		t.Error("FindPeer should have failed before context expired")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
941
func TestFindPeer(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
942
	// t.Skip("skipping test to debug another")
943 944 945
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
946

Steven Allen's avatar
Steven Allen committed
947 948
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
949

950
	dhts := setupDHTS(t, ctx, 4)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
951 952
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
953
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
954
			dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
955 956 957
		}
	}()

958 959 960
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
961

Jeromy's avatar
Jeromy committed
962 963
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
964
	p, err := dhts[0].FindPeer(ctxT, dhts[2].PeerID())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
965 966 967 968
	if err != nil {
		t.Fatal(err)
	}

969
	if p.ID == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
970 971 972
		t.Fatal("Failed to find peer.")
	}

973
	if p.ID != dhts[2].PeerID() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
974 975 976
		t.Fatal("Didnt find expected peer.")
	}
}
977

978
func TestFindPeersConnectedToPeer(t *testing.T) {
979 980
	t.Skip("not quite correct (see note)")

981 982 983 984
	if testing.Short() {
		t.SkipNow()
	}

Steven Allen's avatar
Steven Allen committed
985 986
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
987

988
	dhts := setupDHTS(t, ctx, 4)
989 990 991
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
992
			dhts[i].host.Close()
993 994 995 996 997
		}
	}()

	// topology:
	// 0-1, 1-2, 1-3, 2-3
998 999 1000 1001
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
	connect(t, ctx, dhts[2], dhts[3])
1002 1003 1004 1005 1006 1007

	// fmt.Println("0 is", peers[0])
	// fmt.Println("1 is", peers[1])
	// fmt.Println("2 is", peers[2])
	// fmt.Println("3 is", peers[3])

Jeromy's avatar
Jeromy committed
1008 1009
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
1010
	pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, dhts[2].PeerID())
1011 1012 1013 1014
	if err != nil {
		t.Fatal(err)
	}

1015
	// shouldFind := []peer.ID{peers[1], peers[3]}
1016
	var found []*peer.AddrInfo
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
	for nextp := range pchan {
		found = append(found, nextp)
	}

	// fmt.Printf("querying 0 (%s) FindPeersConnectedToPeer 2 (%s)\n", peers[0], peers[2])
	// fmt.Println("should find 1, 3", shouldFind)
	// fmt.Println("found", found)

	// testPeerListsMatch(t, shouldFind, found)

Matt Joiner's avatar
Matt Joiner committed
1027
	logger.Warning("TestFindPeersConnectedToPeer is not quite correct")
1028 1029 1030 1031 1032
	if len(found) == 0 {
		t.Fatal("didn't find any peers.")
	}
}

1033
func TestConnectCollision(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1034
	// t.Skip("skipping test to debug another")
1035 1036 1037
	if testing.Short() {
		t.SkipNow()
	}
1038 1039 1040
	if travisci.IsRunning() {
		t.Skip("Skipping on Travis-CI.")
	}
1041

1042
	runTimes := 10
1043

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1044
	for rtime := 0; rtime < runTimes; rtime++ {
Matt Joiner's avatar
Matt Joiner committed
1045
		logger.Info("Running Time: ", rtime)
1046

Steven Allen's avatar
Steven Allen committed
1047
		ctx, cancel := context.WithCancel(context.Background())
1048

1049 1050
		dhtA := setupDHT(ctx, t, false)
		dhtB := setupDHT(ctx, t, false)
1051

1052 1053
		addrA := dhtA.peerstore.Addrs(dhtA.self)[0]
		addrB := dhtB.peerstore.Addrs(dhtB.self)[0]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1054

1055 1056
		peerA := dhtA.self
		peerB := dhtB.self
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1057

1058
		errs := make(chan error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1059
		go func() {
1060 1061
			dhtA.peerstore.AddAddr(peerB, addrB, peerstore.TempAddrTTL)
			pi := peer.AddrInfo{ID: peerB}
Jeromy's avatar
Jeromy committed
1062
			err := dhtA.host.Connect(ctx, pi)
1063
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1064 1065
		}()
		go func() {
1066 1067
			dhtB.peerstore.AddAddr(peerA, addrA, peerstore.TempAddrTTL)
			pi := peer.AddrInfo{ID: peerA}
Jeromy's avatar
Jeromy committed
1068
			err := dhtB.host.Connect(ctx, pi)
1069
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1070 1071
		}()

1072
		timeout := time.After(5 * time.Second)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1073
		select {
1074 1075 1076 1077
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1078 1079 1080 1081
		case <-timeout:
			t.Fatal("Timeout received!")
		}
		select {
1082 1083 1084 1085
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1086 1087 1088 1089
		case <-timeout:
			t.Fatal("Timeout received!")
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1090 1091
		dhtA.Close()
		dhtB.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1092 1093
		dhtA.host.Close()
		dhtB.host.Close()
Steven Allen's avatar
Steven Allen committed
1094
		cancel()
Jeromy's avatar
Jeromy committed
1095
	}
1096
}
1097 1098 1099 1100 1101

func TestBadProtoMessages(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

1102
	d := setupDHT(ctx, t, false)
1103 1104 1105 1106 1107 1108

	nilrec := new(pb.Message)
	if _, err := d.handlePutValue(ctx, "testpeer", nilrec); err == nil {
		t.Fatal("should have errored on nil record")
	}
}
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118

func TestClientModeConnect(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	a := setupDHT(ctx, t, false)
	b := setupDHT(ctx, t, true)

	connectNoSync(t, ctx, a, b)

1119
	c := testCaseCids[0]
1120
	p := peer.ID("TestPeer")
1121 1122
	a.providers.AddProvider(ctx, c, p)
	time.Sleep(time.Millisecond * 5) // just in case...
1123

1124
	provs, err := b.FindProviders(ctx, c)
1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
	if err != nil {
		t.Fatal(err)
	}

	if len(provs) == 0 {
		t.Fatal("Expected to get a provider back")
	}

	if provs[0].ID != p {
		t.Fatal("expected it to be our test peer")
	}
Steven Allen's avatar
Steven Allen committed
1136 1137 1138 1139 1140 1141 1142 1143 1144
	if a.routingTable.Find(b.self) != "" {
		t.Fatal("DHT clients should not be added to routing tables")
	}
	if b.routingTable.Find(a.self) == "" {
		t.Fatal("DHT server should have been added to the dht client's routing table")
	}
}

func TestClientModeFindPeer(t *testing.T) {
1145
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Steven Allen's avatar
Steven Allen committed
1146 1147 1148 1149 1150 1151
	defer cancel()

	a := setupDHT(ctx, t, false)
	b := setupDHT(ctx, t, true)
	c := setupDHT(ctx, t, true)

1152 1153
	connectNoSync(t, ctx, b, a)
	connectNoSync(t, ctx, c, a)
Steven Allen's avatar
Steven Allen committed
1154 1155

	// Can't use `connect` because b and c are only clients.
1156 1157
	wait(t, ctx, b, a)
	wait(t, ctx, c, a)
Steven Allen's avatar
Steven Allen committed
1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170

	pi, err := c.FindPeer(ctx, b.self)
	if err != nil {
		t.Fatal(err)
	}
	if len(pi.Addrs) == 0 {
		t.Fatal("should have found addresses for node b")
	}

	err = c.host.Connect(ctx, pi)
	if err != nil {
		t.Fatal(err)
	}
1171
}
1172

Matt Joiner's avatar
Matt Joiner committed
1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184
func minInt(a, b int) int {
	if a < b {
		return a
	} else {
		return b
	}
}

func TestFindPeerQueryMinimal(t *testing.T) {
	testFindPeerQuery(t, 2, 22, 11)
}

1185
func TestFindPeerQuery(t *testing.T) {
Matt Joiner's avatar
Matt Joiner committed
1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199
	if testing.Short() {
		t.Skip("skipping test in short mode")
	}
	if curFileLimit() < 1024 {
		t.Skip("insufficient file descriptors available")
	}
	testFindPeerQuery(t, 20, 80, 16)
}

func testFindPeerQuery(t *testing.T,
	bootstrappers, // Number of nodes connected to the querying node
	leafs, // Number of nodes that might be connected to from the bootstrappers
	bootstrapperLeafConns int, // Number of connections each bootstrapper has to the leaf nodes
) {
1200 1201 1202
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

1203
	dhts := setupDHTS(t, ctx, 1+bootstrappers+leafs)
1204
	defer func() {
Matt Joiner's avatar
Matt Joiner committed
1205 1206 1207
		for _, d := range dhts {
			d.Close()
			d.host.Close()
1208 1209 1210
		}
	}()

Jeromy's avatar
Jeromy committed
1211
	mrand := rand.New(rand.NewSource(42))
1212 1213
	guy := dhts[0]
	others := dhts[1:]
Matt Joiner's avatar
Matt Joiner committed
1214 1215 1216 1217
	for i := 0; i < bootstrappers; i++ {
		for j := 0; j < bootstrapperLeafConns; j++ {
			v := mrand.Intn(leafs)
			connect(t, ctx, others[i], others[bootstrappers+v])
1218 1219 1220
		}
	}

Matt Joiner's avatar
Matt Joiner committed
1221
	for i := 0; i < bootstrappers; i++ {
1222 1223 1224
		connect(t, ctx, guy, others[i])
	}

Matt Joiner's avatar
Matt Joiner committed
1225 1226 1227 1228 1229
	var reachableIds []peer.ID
	for i, d := range dhts {
		lp := len(d.host.Network().Peers())
		//t.Log(i, lp)
		if i != 0 && lp > 0 {
1230
			reachableIds = append(reachableIds, d.PeerID())
Matt Joiner's avatar
Matt Joiner committed
1231 1232 1233 1234
		}
	}
	t.Logf("%d reachable ids", len(reachableIds))

1235 1236 1237
	val := "foobar"
	rtval := kb.ConvertKey(val)

Jeromy's avatar
Jeromy committed
1238
	rtablePeers := guy.routingTable.NearestPeers(rtval, AlphaValue)
Matt Joiner's avatar
Matt Joiner committed
1239
	assert.Len(t, rtablePeers, minInt(bootstrappers, AlphaValue))
1240

Matt Joiner's avatar
Matt Joiner committed
1241
	assert.Len(t, guy.host.Network().Peers(), bootstrappers)
1242 1243

	out, err := guy.GetClosestPeers(ctx, val)
Matt Joiner's avatar
Matt Joiner committed
1244
	require.NoError(t, err)
1245 1246 1247 1248 1249 1250

	var outpeers []peer.ID
	for p := range out {
		outpeers = append(outpeers, p)
	}

Jeromy's avatar
Jeromy committed
1251
	sort.Sort(peer.IDSlice(outpeers))
Steven Allen's avatar
Steven Allen committed
1252

Matt Joiner's avatar
Matt Joiner committed
1253 1254
	exp := kb.SortClosestPeers(reachableIds, rtval)[:minInt(KValue, len(reachableIds))]
	t.Logf("got %d peers", len(outpeers))
1255
	got := kb.SortClosestPeers(outpeers, rtval)
Jeromy's avatar
Jeromy committed
1256

Matt Joiner's avatar
Matt Joiner committed
1257
	assert.EqualValues(t, exp, got)
1258 1259
}

1260 1261 1262 1263 1264
func TestFindClosestPeers(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	nDHTs := 30
1265
	dhts := setupDHTS(t, ctx, nDHTs)
1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

	peers, err := dhts[1].GetClosestPeers(ctx, "foo")
	if err != nil {
		t.Fatal(err)
	}

	var out []peer.ID
	for p := range peers {
		out = append(out, p)
	}

	if len(out) != KValue {
		t.Fatalf("got wrong number of peers (got %d, expected %d)", len(out), KValue)
	}
}
1292 1293

func TestGetSetPluggedProtocol(t *testing.T) {
1294 1295 1296
	t.Run("PutValue/GetValue - same protocol", func(t *testing.T) {
		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()
1297

1298 1299 1300 1301 1302
		os := []opts.Option{
			opts.Protocols("/esh/dht"),
			opts.Client(false),
			opts.NamespacedValidator("v", blankValidator{}),
		}
1303

Steven Allen's avatar
Steven Allen committed
1304
		dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
1305 1306 1307
		if err != nil {
			t.Fatal(err)
		}
1308

Steven Allen's avatar
Steven Allen committed
1309
		dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
1310 1311 1312
		if err != nil {
			t.Fatal(err)
		}
1313

1314
		connect(t, ctx, dhtA, dhtB)
1315

1316
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
1317
		defer cancel()
1318 1319 1320
		if err := dhtA.PutValue(ctxT, "/v/cat", []byte("meow")); err != nil {
			t.Fatal(err)
		}
1321

1322 1323 1324 1325
		value, err := dhtB.GetValue(ctxT, "/v/cat")
		if err != nil {
			t.Fatal(err)
		}
1326

1327 1328 1329 1330 1331
		if string(value) != "meow" {
			t.Fatalf("Expected 'meow' got '%s'", string(value))
		}
	})

1332 1333
	t.Run("DHT routing table for peer A won't contain B if A and B don't use same protocol", func(t *testing.T) {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1334 1335
		defer cancel()

Steven Allen's avatar
Steven Allen committed
1336
		dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []opts.Option{
1337 1338 1339
			opts.Protocols("/esh/dht"),
			opts.Client(false),
			opts.NamespacedValidator("v", blankValidator{}),
1340
		}...)
1341 1342 1343 1344
		if err != nil {
			t.Fatal(err)
		}

Steven Allen's avatar
Steven Allen committed
1345
		dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []opts.Option{
1346 1347 1348
			opts.Protocols("/lsr/dht"),
			opts.Client(false),
			opts.NamespacedValidator("v", blankValidator{}),
1349
		}...)
1350 1351 1352 1353
		if err != nil {
			t.Fatal(err)
		}

1354
		connectNoSync(t, ctx, dhtA, dhtB)
1355

1356 1357 1358 1359 1360 1361 1362
		// We don't expect connection notifications for A to reach B (or vice-versa), given
		// that they've been configured with different protocols - but we'll give them a
		// chance, anyhow.
		time.Sleep(time.Second * 2)

		err = dhtA.PutValue(ctx, "/v/cat", []byte("meow"))
		if err == nil || !strings.Contains(err.Error(), "failed to find any peer in table") {
1363
			t.Fatalf("put should not have been able to find any peers in routing table, err:'%v'", err)
1364 1365
		}

1366 1367
		_, err = dhtB.GetValue(ctx, "/v/cat")
		if err == nil || !strings.Contains(err.Error(), "failed to find any peer in table") {
1368
			t.Fatalf("get should not have been able to find any peers in routing table, err:'%v'", err)
1369
		}
1370
	})
1371
}
1372 1373 1374 1375 1376

func TestPing(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	ds := setupDHTS(t, ctx, 2)
1377
	ds[0].Host().Peerstore().AddAddrs(ds[1].PeerID(), ds[1].Host().Addrs(), peerstore.AddressTTL)
1378 1379 1380 1381 1382 1383 1384 1385
	assert.NoError(t, ds[0].Ping(context.Background(), ds[1].PeerID()))
}

func TestClientModeAtInit(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	pinger := setupDHT(ctx, t, false)
	client := setupDHT(ctx, t, true)
1386
	pinger.Host().Peerstore().AddAddrs(client.PeerID(), client.Host().Addrs(), peerstore.AddressTTL)
1387 1388 1389
	err := pinger.Ping(context.Background(), client.PeerID())
	assert.True(t, xerrors.Is(err, multistream.ErrNotSupported))
}