dht_test.go 18.4 KB
Newer Older
1 2
package dht

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3
import (
Jeromy's avatar
Jeromy committed
4
	"context"
5
	"fmt"
6
	"math/rand"
7
	"sort"
8
	"sync"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	"testing"
10
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11

12 13
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"

14
	cid "github.com/ipfs/go-cid"
15 16 17 18 19 20
	ds "github.com/ipfs/go-datastore"
	dssync "github.com/ipfs/go-datastore/sync"
	u "github.com/ipfs/go-ipfs-util"
	peer "github.com/ipfs/go-libp2p-peer"
	pstore "github.com/ipfs/go-libp2p-peerstore"
	ma "github.com/jbenet/go-multiaddr"
George Antoniadis's avatar
George Antoniadis committed
21 22 23 24
	record "github.com/libp2p/go-libp2p-record"
	netutil "github.com/libp2p/go-libp2p/p2p/test/util"
	ci "github.com/libp2p/go-testutil/ci"
	travisci "github.com/libp2p/go-testutil/ci/travis"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25 26
)

27 28
var testCaseValues = map[string][]byte{}
var testCaseCids []*cid.Cid
29 30 31 32 33 34

func init() {
	testCaseValues["hello"] = []byte("world")
	for i := 0; i < 100; i++ {
		k := fmt.Sprintf("%d -- key", i)
		v := fmt.Sprintf("%d -- value", i)
35 36 37 38
		testCaseValues[k] = []byte(v)

		mhv := u.Hash([]byte(v))
		testCaseCids = append(testCaseCids, cid.NewCidV0(mhv))
39 40 41
	}
}

42
func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43
	h := netutil.GenHostSwarm(t, ctx)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44

45
	dss := dssync.MutexWrap(ds.NewMapDatastore())
46 47 48 49 50 51
	var d *IpfsDHT
	if client {
		d = NewDHTClient(ctx, h, dss)
	} else {
		d = NewDHT(ctx, h, dss)
	}
52

53
	d.Validator["v"] = &record.ValidChecker{
54
		Func: func(string, []byte) error {
55 56 57
			return nil
		},
		Sign: false,
Jeromy's avatar
Jeromy committed
58
	}
59
	d.Selector["v"] = func(_ string, bs [][]byte) (int, error) { return 0, nil }
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60 61 62
	return d
}

63 64
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) {
	addrs := make([]ma.Multiaddr, n)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65
	dhts := make([]*IpfsDHT, n)
66 67
	peers := make([]peer.ID, n)

68 69 70
	sanityAddrsMap := make(map[string]struct{})
	sanityPeersMap := make(map[string]struct{})

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
	for i := 0; i < n; i++ {
72
		dhts[i] = setupDHT(ctx, t, false)
73
		peers[i] = dhts[i].self
74
		addrs[i] = dhts[i].peerstore.Addrs(dhts[i].self)[0]
75 76

		if _, lol := sanityAddrsMap[addrs[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
77
			t.Fatal("While setting up DHTs address got duplicated.")
78 79 80 81
		} else {
			sanityAddrsMap[addrs[i].String()] = struct{}{}
		}
		if _, lol := sanityPeersMap[peers[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
82
			t.Fatal("While setting up DHTs peerid got duplicated.")
83 84 85
		} else {
			sanityPeersMap[peers[i].String()] = struct{}{}
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86 87 88 89 90
	}

	return addrs, peers, dhts
}

91
func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
92
	idB := b.self
93
	addrB := b.peerstore.Addrs(idB)
94 95
	if len(addrB) == 0 {
		t.Fatal("peers setup incorrectly: no local address")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
96
	}
97

Jeromy's avatar
Jeromy committed
98 99
	a.peerstore.AddAddrs(idB, addrB, pstore.TempAddrTTL)
	pi := pstore.PeerInfo{ID: idB}
Jeromy's avatar
Jeromy committed
100
	if err := a.host.Connect(ctx, pi); err != nil {
101
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102
	}
103 104 105 106
}

func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
	connectNoSync(t, ctx, a, b)
107

108 109
	// loop until connection notification has been received.
	// under high load, this may not happen as immediately as we would like.
110 111 112 113 114 115 116
	for a.routingTable.Find(b.self) == "" {
		time.Sleep(time.Millisecond * 5)
	}

	for b.routingTable.Find(a.self) == "" {
		time.Sleep(time.Millisecond * 5)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117 118
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119
func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
120

121
	ctx, cancel := context.WithCancel(ctx)
Richard Littauer's avatar
Richard Littauer committed
122
	log.Debugf("Bootstrapping DHTs...")
123 124 125 126 127 128

	// tried async. sequential fares much better. compare:
	// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
	// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
	// probably because results compound

129 130 131 132
	var cfg BootstrapConfig
	cfg = DefaultBootstrapConfig
	cfg.Queries = 3

133 134 135
	start := rand.Intn(len(dhts)) // randomize to decrease bias.
	for i := range dhts {
		dht := dhts[(start+i)%len(dhts)]
136
		dht.runBootstrap(ctx, cfg)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
137
	}
138
	cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
139 140
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141
func TestValueGetSet(t *testing.T) {
142 143
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
144

145 146
	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147

148 149
	defer dhtA.Close()
	defer dhtB.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150 151
	defer dhtA.host.Close()
	defer dhtB.host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152

153
	vf := &record.ValidChecker{
154
		Func: func(string, []byte) error { return nil },
155
		Sign: false,
Jeromy's avatar
Jeromy committed
156
	}
157
	nulsel := func(_ string, bs [][]byte) (int, error) { return 0, nil }
158

159 160
	dhtA.Validator["v"] = vf
	dhtB.Validator["v"] = vf
161 162
	dhtA.Selector["v"] = nulsel
	dhtB.Selector["v"] = nulsel
Jeromy's avatar
Jeromy committed
163

164
	connect(t, ctx, dhtA, dhtB)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165

166
	log.Error("adding value on: ", dhtA.self)
Jeromy's avatar
Jeromy committed
167 168
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
169
	err := dhtA.PutValue(ctxT, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170 171 172 173
	if err != nil {
		t.Fatal(err)
	}

174 175 176 177 178 179 180 181 182 183 184
	/*
		ctxT, _ = context.WithTimeout(ctx, time.Second*2)
		val, err := dhtA.GetValue(ctxT, "/v/hello")
		if err != nil {
			t.Fatal(err)
		}

		if string(val) != "world" {
			t.Fatalf("Expected 'world' got '%s'", string(val))
		}
	*/
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185

186
	log.Error("requesting value on dht: ", dhtB.self)
Jeromy's avatar
Jeromy committed
187 188
	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
189
	valb, err := dhtB.GetValue(ctxT, "/v/hello")
190 191 192 193
	if err != nil {
		t.Fatal(err)
	}

194 195
	if string(valb) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(valb))
196
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
197 198
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
199 200
func TestProvides(t *testing.T) {
	// t.Skip("skipping test to debug another")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
201
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202

203
	_, _, dhts := setupDHTS(ctx, 4, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204 205
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
206
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208 209 210
		}
	}()

211 212 213
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214

215
	for _, k := range testCaseCids {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
216
		log.Debugf("announcing provider for %s", k)
217 218 219
		if err := dhts[3].Provide(ctx, k); err != nil {
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220 221
	}

222 223 224 225
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)

	n := 0
226
	for _, c := range testCaseCids {
227 228
		n = (n + 1) % 3

229
		log.Debugf("getting providers for %s from %d", c, n)
Jeromy's avatar
Jeromy committed
230 231
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
232
		provchan := dhts[n].FindProvidersAsync(ctxT, c, 1)
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247

		select {
		case prov := <-provchan:
			if prov.ID == "" {
				t.Fatal("Got back nil provider")
			}
			if prov.ID != dhts[3].self {
				t.Fatal("Got back wrong provider")
			}
		case <-ctxT.Done():
			t.Fatal("Did not get a provider back.")
		}
	}
}

248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
	// test "well-formed-ness" (>= minPeers peers in every routing table)

	checkTables := func() bool {
		totalPeers := 0
		for _, dht := range dhts {
			rtlen := dht.routingTable.Size()
			totalPeers += rtlen
			if minPeers > 0 && rtlen < minPeers {
				t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
				return false
			}
		}
		actualAvgPeers := totalPeers / len(dhts)
		t.Logf("avg rt size: %d", actualAvgPeers)
		if avgPeers > 0 && actualAvgPeers < avgPeers {
			t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers)
			return false
		}
		return true
	}

	timeoutA := time.After(timeout)
	for {
		select {
		case <-timeoutA:
275
			log.Debugf("did not reach well-formed routing tables by %s", timeout)
276 277 278 279 280 281 282 283 284 285 286
			return false // failed
		case <-time.After(5 * time.Millisecond):
			if checkTables() {
				return true // succeeded
			}
		}
	}
}

func printRoutingTables(dhts []*IpfsDHT) {
	// the routing tables should be full now. let's inspect them.
287
	fmt.Printf("checking routing table of %d\n", len(dhts))
288 289 290 291 292 293 294
	for _, dht := range dhts {
		fmt.Printf("checking routing table of %s\n", dht.self)
		dht.routingTable.Print()
		fmt.Println("")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
295
func TestBootstrap(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
296
	// t.Skip("skipping test to debug another")
297 298 299 300
	if testing.Short() {
		t.SkipNow()
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
301 302
	ctx := context.Background()

303
	nDHTs := 30
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
304 305 306 307
	_, _, dhts := setupDHTS(ctx, nDHTs, t)
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
308
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
309 310 311 312 313 314 315 316
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

317
	<-time.After(100 * time.Millisecond)
318 319 320 321
	// bootstrap a few times until we get good tables.
	stop := make(chan struct{})
	go func() {
		for {
322
			t.Logf("bootstrapping them so they find each other %d", nDHTs)
Jeromy's avatar
Jeromy committed
323 324
			ctxT, cancel := context.WithTimeout(ctx, 5*time.Second)
			defer cancel()
325 326 327 328 329 330 331 332 333 334 335
			bootstrap(t, ctxT, dhts)

			select {
			case <-time.After(50 * time.Millisecond):
				continue // being explicit
			case <-stop:
				return
			}
		}
	}()

336
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
337
	close(stop)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
338

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
339 340
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
341 342 343 344 345 346
		printRoutingTables(dhts)
	}
}

func TestPeriodicBootstrap(t *testing.T) {
	// t.Skip("skipping test to debug another")
347 348 349
	if ci.IsRunning() {
		t.Skip("skipping on CI. highly timing dependent")
	}
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
	if testing.Short() {
		t.SkipNow()
	}

	ctx := context.Background()

	nDHTs := 30
	_, _, dhts := setupDHTS(ctx, nDHTs, t)
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	// signal amplifier
	amplify := func(signal chan time.Time, other []chan time.Time) {
		for t := range signal {
			for _, s := range other {
				s <- t
			}
		}
		for _, s := range other {
			close(s)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
374 375 376
		}
	}

377 378 379
	signal := make(chan time.Time)
	allSignals := []chan time.Time{}

380 381 382 383
	var cfg BootstrapConfig
	cfg = DefaultBootstrapConfig
	cfg.Queries = 5

384 385 386 387
	// kick off periodic bootstrappers with instrumented signals.
	for _, dht := range dhts {
		s := make(chan time.Time)
		allSignals = append(allSignals, s)
388
		dht.BootstrapOnSignal(cfg, s)
389 390 391
	}
	go amplify(signal, allSignals)

392
	t.Logf("dhts are not connected. %d", nDHTs)
393 394 395 396 397 398 399 400 401 402 403
	for _, dht := range dhts {
		rtlen := dht.routingTable.Size()
		if rtlen > 0 {
			t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen)
		}
	}

	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

404
	t.Logf("DHTs are now connected to 1-2 others. %d", nDHTs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
405
	for _, dht := range dhts {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
406
		rtlen := dht.routingTable.Size()
407 408
		if rtlen > 2 {
			t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
409
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
410
	}
411

412 413 414 415
	if u.Debug {
		printRoutingTables(dhts)
	}

416
	t.Logf("bootstrapping them so they find each other. %d", nDHTs)
417 418 419 420
	signal <- time.Now()

	// this is async, and we dont know when it's finished with one cycle, so keep checking
	// until the routing tables look better, or some long timeout for the failure case.
421
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
422 423 424

	if u.Debug {
		printRoutingTables(dhts)
425
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
426 427
}

428 429
func TestProvidesMany(t *testing.T) {
	t.Skip("this test doesn't work")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
430
	// t.Skip("skipping test to debug another")
431 432 433 434 435 436 437
	ctx := context.Background()

	nDHTs := 40
	_, _, dhts := setupDHTS(ctx, nDHTs, t)
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
438
			defer dhts[i].host.Close()
439 440 441 442 443 444 445 446
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

447
	<-time.After(100 * time.Millisecond)
448
	t.Logf("bootstrapping them so they find each other. %d", nDHTs)
Jeromy's avatar
Jeromy committed
449 450
	ctxT, cancel := context.WithTimeout(ctx, 20*time.Second)
	defer cancel()
451 452
	bootstrap(t, ctxT, dhts)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
453 454 455 456 457 458 459 460
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
		t.Logf("checking routing table of %d", nDHTs)
		for _, dht := range dhts {
			fmt.Printf("checking routing table of %s\n", dht.self)
			dht.routingTable.Print()
			fmt.Println("")
		}
461
	}
462

463
	providers := make(map[string]peer.ID)
464

465
	d := 0
466
	for _, c := range testCaseCids {
467 468
		d = (d + 1) % len(dhts)
		dht := dhts[d]
469
		providers[c.KeyString()] = dht.self
470

471 472
		t.Logf("announcing provider for %s", c)
		if err := dht.Provide(ctx, c); err != nil {
473 474
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
475 476
	}

477 478
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
479

480 481
	errchan := make(chan error)

Jeromy's avatar
Jeromy committed
482 483
	ctxT, cancel = context.WithTimeout(ctx, 5*time.Second)
	defer cancel()
484 485

	var wg sync.WaitGroup
486
	getProvider := func(dht *IpfsDHT, k *cid.Cid) {
487
		defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
488

489
		expected := providers[k.KeyString()]
490

491 492 493
		provchan := dht.FindProvidersAsync(ctxT, k, 1)
		select {
		case prov := <-provchan:
494 495
			actual := prov.ID
			if actual == "" {
496
				errchan <- fmt.Errorf("Got back nil provider (%s at %s)", k, dht.self)
497 498 499
			} else if actual != expected {
				errchan <- fmt.Errorf("Got back wrong provider (%s != %s) (%s at %s)",
					expected, actual, k, dht.self)
500 501 502
			}
		case <-ctxT.Done():
			errchan <- fmt.Errorf("Did not get a provider back (%s at %s)", k, dht.self)
Jeromy's avatar
Jeromy committed
503
		}
504 505
	}

506
	for _, c := range testCaseCids {
507 508
		// everyone should be able to find it...
		for _, dht := range dhts {
509
			log.Debugf("getting providers for %s at %s", c, dht.self)
510
			wg.Add(1)
511
			go getProvider(dht, c)
512
		}
513 514 515 516 517 518 519 520 521 522
	}

	// we need this because of printing errors
	go func() {
		wg.Wait()
		close(errchan)
	}()

	for err := range errchan {
		t.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
523 524 525
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
526
func TestProvidesAsync(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
527
	// t.Skip("skipping test to debug another")
528 529 530
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
531

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
532
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
533

534
	_, _, dhts := setupDHTS(ctx, 4, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
535 536
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
537
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
538
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
539 540 541
		}
	}()

542 543 544
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
545

546
	err := dhts[3].Provide(ctx, testCaseCids[0])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
547 548 549 550 551 552
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(time.Millisecond * 60)

Jeromy's avatar
Jeromy committed
553 554
	ctxT, cancel := context.WithTimeout(ctx, time.Millisecond*300)
	defer cancel()
555
	provs := dhts[0].FindProvidersAsync(ctxT, testCaseCids[0], 5)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
556
	select {
Jeromy's avatar
Jeromy committed
557 558 559 560
	case p, ok := <-provs:
		if !ok {
			t.Fatal("Provider channel was closed...")
		}
561
		if p.ID == "" {
Jeromy's avatar
Jeromy committed
562 563
			t.Fatal("Got back nil provider!")
		}
564
		if p.ID != dhts[3].self {
565
			t.Fatalf("got a provider, but not the right one. %s", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
566
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
567
	case <-ctxT.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
568 569 570 571
		t.Fatal("Didnt get back providers")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
572
func TestLayeredGet(t *testing.T) {
573 574
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
575

576
	_, _, dhts := setupDHTS(ctx, 4, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
577 578
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
579
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
580
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
581 582 583
		}
	}()

584 585
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
586
	connect(t, ctx, dhts[2], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
587

588
	err := dhts[3].PutValue(ctx, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
589 590 591 592
	if err != nil {
		t.Fatal(err)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
593
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
594

Jeromy's avatar
Jeromy committed
595 596
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
597 598 599
	val, err := dhts[0].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
600
	}
601 602 603

	if string(val) != "world" {
		t.Error("got wrong value")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
604 605 606 607
	}
}

func TestFindPeer(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
608
	// t.Skip("skipping test to debug another")
609 610 611
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
612

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
613
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
614

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
615
	_, peers, dhts := setupDHTS(ctx, 4, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
616 617
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
618
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
619
			dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
620 621 622
		}
	}()

623 624 625
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
626

Jeromy's avatar
Jeromy committed
627 628
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
629
	p, err := dhts[0].FindPeer(ctxT, peers[2])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
630 631 632 633
	if err != nil {
		t.Fatal(err)
	}

634
	if p.ID == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
635 636 637
		t.Fatal("Failed to find peer.")
	}

638
	if p.ID != peers[2] {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
639 640 641
		t.Fatal("Didnt find expected peer.")
	}
}
642

643
func TestFindPeersConnectedToPeer(t *testing.T) {
644 645
	t.Skip("not quite correct (see note)")

646 647 648 649 650 651 652 653 654 655
	if testing.Short() {
		t.SkipNow()
	}

	ctx := context.Background()

	_, peers, dhts := setupDHTS(ctx, 4, t)
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
656
			dhts[i].host.Close()
657 658 659 660 661
		}
	}()

	// topology:
	// 0-1, 1-2, 1-3, 2-3
662 663 664 665
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
	connect(t, ctx, dhts[2], dhts[3])
666 667 668 669 670 671

	// fmt.Println("0 is", peers[0])
	// fmt.Println("1 is", peers[1])
	// fmt.Println("2 is", peers[2])
	// fmt.Println("3 is", peers[3])

Jeromy's avatar
Jeromy committed
672 673
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
674
	pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2])
675 676 677 678
	if err != nil {
		t.Fatal(err)
	}

679
	// shouldFind := []peer.ID{peers[1], peers[3]}
Jeromy's avatar
Jeromy committed
680
	var found []pstore.PeerInfo
681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696
	for nextp := range pchan {
		found = append(found, nextp)
	}

	// fmt.Printf("querying 0 (%s) FindPeersConnectedToPeer 2 (%s)\n", peers[0], peers[2])
	// fmt.Println("should find 1, 3", shouldFind)
	// fmt.Println("found", found)

	// testPeerListsMatch(t, shouldFind, found)

	log.Warning("TestFindPeersConnectedToPeer is not quite correct")
	if len(found) == 0 {
		t.Fatal("didn't find any peers.")
	}
}

697
func testPeerListsMatch(t *testing.T, p1, p2 []peer.ID) {
698 699 700 701 702 703 704 705 706

	if len(p1) != len(p2) {
		t.Fatal("did not find as many peers as should have", p1, p2)
	}

	ids1 := make([]string, len(p1))
	ids2 := make([]string, len(p2))

	for i, p := range p1 {
707
		ids1[i] = string(p)
708 709 710
	}

	for i, p := range p2 {
711
		ids2[i] = string(p)
712 713 714 715 716 717 718 719 720 721 722 723
	}

	sort.Sort(sort.StringSlice(ids1))
	sort.Sort(sort.StringSlice(ids2))

	for i := range ids1 {
		if ids1[i] != ids2[i] {
			t.Fatal("Didnt find expected peer", ids1[i], ids2)
		}
	}
}

724
func TestConnectCollision(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
725
	// t.Skip("skipping test to debug another")
726 727 728
	if testing.Short() {
		t.SkipNow()
	}
729 730 731
	if travisci.IsRunning() {
		t.Skip("Skipping on Travis-CI.")
	}
732

733
	runTimes := 10
734

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
735
	for rtime := 0; rtime < runTimes; rtime++ {
rht's avatar
rht committed
736
		log.Info("Running Time: ", rtime)
737

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
738
		ctx := context.Background()
739

740 741
		dhtA := setupDHT(ctx, t, false)
		dhtB := setupDHT(ctx, t, false)
742

743 744
		addrA := dhtA.peerstore.Addrs(dhtA.self)[0]
		addrB := dhtB.peerstore.Addrs(dhtB.self)[0]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
745

746 747
		peerA := dhtA.self
		peerB := dhtB.self
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
748

749
		errs := make(chan error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
750
		go func() {
Jeromy's avatar
Jeromy committed
751 752
			dhtA.peerstore.AddAddr(peerB, addrB, pstore.TempAddrTTL)
			pi := pstore.PeerInfo{ID: peerB}
Jeromy's avatar
Jeromy committed
753
			err := dhtA.host.Connect(ctx, pi)
754
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
755 756
		}()
		go func() {
Jeromy's avatar
Jeromy committed
757 758
			dhtB.peerstore.AddAddr(peerA, addrA, pstore.TempAddrTTL)
			pi := pstore.PeerInfo{ID: peerA}
Jeromy's avatar
Jeromy committed
759
			err := dhtB.host.Connect(ctx, pi)
760
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
761 762
		}()

763
		timeout := time.After(5 * time.Second)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
764
		select {
765 766 767 768
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
769 770 771 772
		case <-timeout:
			t.Fatal("Timeout received!")
		}
		select {
773 774 775 776
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
777 778 779 780
		case <-timeout:
			t.Fatal("Timeout received!")
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
781 782
		dhtA.Close()
		dhtB.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
783 784
		dhtA.host.Close()
		dhtB.host.Close()
Jeromy's avatar
Jeromy committed
785
	}
786
}
787 788 789 790 791

func TestBadProtoMessages(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

792
	d := setupDHT(ctx, t, false)
793 794 795 796 797 798

	nilrec := new(pb.Message)
	if _, err := d.handlePutValue(ctx, "testpeer", nilrec); err == nil {
		t.Fatal("should have errored on nil record")
	}
}
799 800 801 802 803 804 805 806 807 808

func TestClientModeConnect(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	a := setupDHT(ctx, t, false)
	b := setupDHT(ctx, t, true)

	connectNoSync(t, ctx, a, b)

809
	c := testCaseCids[0]
810
	p := peer.ID("TestPeer")
811 812
	a.providers.AddProvider(ctx, c, p)
	time.Sleep(time.Millisecond * 5) // just in case...
813

814
	provs, err := b.FindProviders(ctx, c)
815 816 817 818 819 820 821 822 823 824 825 826
	if err != nil {
		t.Fatal(err)
	}

	if len(provs) == 0 {
		t.Fatal("Expected to get a provider back")
	}

	if provs[0].ID != p {
		t.Fatal("expected it to be our test peer")
	}
}