dht_test.go 19.1 KB
Newer Older
1 2
package dht

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3
import (
Jeromy's avatar
Jeromy committed
4
	"context"
5
	"fmt"
6
	"math/rand"
7
	"sort"
8
	"sync"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	"testing"
10
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11

12 13
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"

14
	cid "github.com/ipfs/go-cid"
15 16 17
	ds "github.com/ipfs/go-datastore"
	dssync "github.com/ipfs/go-datastore/sync"
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
18
	netutil "github.com/libp2p/go-libp2p-netutil"
19 20
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
21
	record "github.com/libp2p/go-libp2p-record"
Jeromy's avatar
Jeromy committed
22
	bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
George Antoniadis's avatar
George Antoniadis committed
23 24
	ci "github.com/libp2p/go-testutil/ci"
	travisci "github.com/libp2p/go-testutil/ci/travis"
25
	ma "github.com/multiformats/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27
)

28 29
var testCaseValues = map[string][]byte{}
var testCaseCids []*cid.Cid
30 31 32 33 34 35

func init() {
	testCaseValues["hello"] = []byte("world")
	for i := 0; i < 100; i++ {
		k := fmt.Sprintf("%d -- key", i)
		v := fmt.Sprintf("%d -- value", i)
36 37 38 39
		testCaseValues[k] = []byte(v)

		mhv := u.Hash([]byte(v))
		testCaseCids = append(testCaseCids, cid.NewCidV0(mhv))
40 41 42
	}
}

43
func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
Jeromy's avatar
Jeromy committed
44
	h := bhost.New(netutil.GenSwarmNetwork(t, ctx))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45

46
	dss := dssync.MutexWrap(ds.NewMapDatastore())
47 48 49 50 51 52
	var d *IpfsDHT
	if client {
		d = NewDHTClient(ctx, h, dss)
	} else {
		d = NewDHT(ctx, h, dss)
	}
53

54
	d.Validator["v"] = &record.ValidChecker{
55
		Func: func(string, []byte) error {
56 57 58
			return nil
		},
		Sign: false,
Jeromy's avatar
Jeromy committed
59
	}
60
	d.Selector["v"] = func(_ string, bs [][]byte) (int, error) { return 0, nil }
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61 62 63
	return d
}

64 65
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) {
	addrs := make([]ma.Multiaddr, n)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66
	dhts := make([]*IpfsDHT, n)
67 68
	peers := make([]peer.ID, n)

69 70 71
	sanityAddrsMap := make(map[string]struct{})
	sanityPeersMap := make(map[string]struct{})

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
	for i := 0; i < n; i++ {
73
		dhts[i] = setupDHT(ctx, t, false)
74
		peers[i] = dhts[i].self
75
		addrs[i] = dhts[i].peerstore.Addrs(dhts[i].self)[0]
76 77

		if _, lol := sanityAddrsMap[addrs[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
78
			t.Fatal("While setting up DHTs address got duplicated.")
79 80 81 82
		} else {
			sanityAddrsMap[addrs[i].String()] = struct{}{}
		}
		if _, lol := sanityPeersMap[peers[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
83
			t.Fatal("While setting up DHTs peerid got duplicated.")
84 85 86
		} else {
			sanityPeersMap[peers[i].String()] = struct{}{}
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87 88 89 90 91
	}

	return addrs, peers, dhts
}

92
func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
93
	idB := b.self
94
	addrB := b.peerstore.Addrs(idB)
95 96
	if len(addrB) == 0 {
		t.Fatal("peers setup incorrectly: no local address")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
97
	}
98

Jeromy's avatar
Jeromy committed
99 100
	a.peerstore.AddAddrs(idB, addrB, pstore.TempAddrTTL)
	pi := pstore.PeerInfo{ID: idB}
Jeromy's avatar
Jeromy committed
101
	if err := a.host.Connect(ctx, pi); err != nil {
102
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
103
	}
104 105 106 107
}

func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
	connectNoSync(t, ctx, a, b)
108

109 110
	// loop until connection notification has been received.
	// under high load, this may not happen as immediately as we would like.
111 112 113 114 115 116 117
	for a.routingTable.Find(b.self) == "" {
		time.Sleep(time.Millisecond * 5)
	}

	for b.routingTable.Find(a.self) == "" {
		time.Sleep(time.Millisecond * 5)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118 119
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120
func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
121

122
	ctx, cancel := context.WithCancel(ctx)
Richard Littauer's avatar
Richard Littauer committed
123
	log.Debugf("Bootstrapping DHTs...")
124 125 126 127 128 129

	// tried async. sequential fares much better. compare:
	// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
	// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
	// probably because results compound

130 131 132 133
	var cfg BootstrapConfig
	cfg = DefaultBootstrapConfig
	cfg.Queries = 3

134 135 136
	start := rand.Intn(len(dhts)) // randomize to decrease bias.
	for i := range dhts {
		dht := dhts[(start+i)%len(dhts)]
137
		dht.runBootstrap(ctx, cfg)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138
	}
139
	cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
140 141
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
142
func TestValueGetSet(t *testing.T) {
143 144
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
145

146 147
	dhtA := setupDHT(ctx, t, false)
	dhtB := setupDHT(ctx, t, false)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148

149 150
	defer dhtA.Close()
	defer dhtB.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151 152
	defer dhtA.host.Close()
	defer dhtB.host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153

154
	vf := &record.ValidChecker{
155
		Func: func(string, []byte) error { return nil },
156
		Sign: false,
Jeromy's avatar
Jeromy committed
157
	}
158
	nulsel := func(_ string, bs [][]byte) (int, error) { return 0, nil }
159

160 161
	dhtA.Validator["v"] = vf
	dhtB.Validator["v"] = vf
162 163
	dhtA.Selector["v"] = nulsel
	dhtB.Selector["v"] = nulsel
Jeromy's avatar
Jeromy committed
164

165
	connect(t, ctx, dhtA, dhtB)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166

167
	log.Error("adding value on: ", dhtA.self)
Jeromy's avatar
Jeromy committed
168 169
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
170
	err := dhtA.PutValue(ctxT, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171 172 173 174
	if err != nil {
		t.Fatal(err)
	}

175 176 177 178 179 180 181 182 183 184 185
	/*
		ctxT, _ = context.WithTimeout(ctx, time.Second*2)
		val, err := dhtA.GetValue(ctxT, "/v/hello")
		if err != nil {
			t.Fatal(err)
		}

		if string(val) != "world" {
			t.Fatalf("Expected 'world' got '%s'", string(val))
		}
	*/
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186

187
	log.Error("requesting value on dht: ", dhtB.self)
Jeromy's avatar
Jeromy committed
188 189
	ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
	defer cancel()
190
	valb, err := dhtB.GetValue(ctxT, "/v/hello")
191 192 193 194
	if err != nil {
		t.Fatal(err)
	}

195 196
	if string(valb) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(valb))
197
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198 199
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
200 201
func TestProvides(t *testing.T) {
	// t.Skip("skipping test to debug another")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
203

204
	_, _, dhts := setupDHTS(ctx, 4, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
205 206
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209 210 211
		}
	}()

212 213 214
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
215

216
	for _, k := range testCaseCids {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
217
		log.Debugf("announcing provider for %s", k)
218 219 220
		if err := dhts[3].Provide(ctx, k); err != nil {
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221 222
	}

223 224 225 226
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)

	n := 0
227
	for _, c := range testCaseCids {
228 229
		n = (n + 1) % 3

230
		log.Debugf("getting providers for %s from %d", c, n)
Jeromy's avatar
Jeromy committed
231 232
		ctxT, cancel := context.WithTimeout(ctx, time.Second)
		defer cancel()
233
		provchan := dhts[n].FindProvidersAsync(ctxT, c, 1)
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248

		select {
		case prov := <-provchan:
			if prov.ID == "" {
				t.Fatal("Got back nil provider")
			}
			if prov.ID != dhts[3].self {
				t.Fatal("Got back wrong provider")
			}
		case <-ctxT.Done():
			t.Fatal("Did not get a provider back.")
		}
	}
}

249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
	// test "well-formed-ness" (>= minPeers peers in every routing table)

	checkTables := func() bool {
		totalPeers := 0
		for _, dht := range dhts {
			rtlen := dht.routingTable.Size()
			totalPeers += rtlen
			if minPeers > 0 && rtlen < minPeers {
				t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
				return false
			}
		}
		actualAvgPeers := totalPeers / len(dhts)
		t.Logf("avg rt size: %d", actualAvgPeers)
		if avgPeers > 0 && actualAvgPeers < avgPeers {
			t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers)
			return false
		}
		return true
	}

	timeoutA := time.After(timeout)
	for {
		select {
		case <-timeoutA:
276
			log.Debugf("did not reach well-formed routing tables by %s", timeout)
277 278 279 280 281 282 283 284 285 286 287
			return false // failed
		case <-time.After(5 * time.Millisecond):
			if checkTables() {
				return true // succeeded
			}
		}
	}
}

func printRoutingTables(dhts []*IpfsDHT) {
	// the routing tables should be full now. let's inspect them.
288
	fmt.Printf("checking routing table of %d\n", len(dhts))
289 290 291 292 293 294 295
	for _, dht := range dhts {
		fmt.Printf("checking routing table of %s\n", dht.self)
		dht.routingTable.Print()
		fmt.Println("")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
296
func TestBootstrap(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
297
	// t.Skip("skipping test to debug another")
298 299 300 301
	if testing.Short() {
		t.SkipNow()
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
302 303
	ctx := context.Background()

304
	nDHTs := 30
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
305 306 307 308
	_, _, dhts := setupDHTS(ctx, nDHTs, t)
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
309
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
310 311 312 313 314 315 316 317
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

318
	<-time.After(100 * time.Millisecond)
319 320 321 322
	// bootstrap a few times until we get good tables.
	stop := make(chan struct{})
	go func() {
		for {
323
			t.Logf("bootstrapping them so they find each other %d", nDHTs)
Jeromy's avatar
Jeromy committed
324 325
			ctxT, cancel := context.WithTimeout(ctx, 5*time.Second)
			defer cancel()
326 327 328 329 330 331 332 333 334 335 336
			bootstrap(t, ctxT, dhts)

			select {
			case <-time.After(50 * time.Millisecond):
				continue // being explicit
			case <-stop:
				return
			}
		}
	}()

337
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
338
	close(stop)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
339

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
340 341
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
342 343 344 345 346 347
		printRoutingTables(dhts)
	}
}

func TestPeriodicBootstrap(t *testing.T) {
	// t.Skip("skipping test to debug another")
348 349 350
	if ci.IsRunning() {
		t.Skip("skipping on CI. highly timing dependent")
	}
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
	if testing.Short() {
		t.SkipNow()
	}

	ctx := context.Background()

	nDHTs := 30
	_, _, dhts := setupDHTS(ctx, nDHTs, t)
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	// signal amplifier
	amplify := func(signal chan time.Time, other []chan time.Time) {
		for t := range signal {
			for _, s := range other {
				s <- t
			}
		}
		for _, s := range other {
			close(s)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
375 376 377
		}
	}

378 379 380
	signal := make(chan time.Time)
	allSignals := []chan time.Time{}

381 382 383 384
	var cfg BootstrapConfig
	cfg = DefaultBootstrapConfig
	cfg.Queries = 5

385 386 387 388
	// kick off periodic bootstrappers with instrumented signals.
	for _, dht := range dhts {
		s := make(chan time.Time)
		allSignals = append(allSignals, s)
389
		dht.BootstrapOnSignal(cfg, s)
390 391 392
	}
	go amplify(signal, allSignals)

393
	t.Logf("dhts are not connected. %d", nDHTs)
394 395 396 397 398 399 400 401 402 403 404
	for _, dht := range dhts {
		rtlen := dht.routingTable.Size()
		if rtlen > 0 {
			t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen)
		}
	}

	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

405
	t.Logf("DHTs are now connected to 1-2 others. %d", nDHTs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
406
	for _, dht := range dhts {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
407
		rtlen := dht.routingTable.Size()
408 409
		if rtlen > 2 {
			t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
410
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
411
	}
412

413 414 415 416
	if u.Debug {
		printRoutingTables(dhts)
	}

417
	t.Logf("bootstrapping them so they find each other. %d", nDHTs)
418 419 420 421
	signal <- time.Now()

	// this is async, and we dont know when it's finished with one cycle, so keep checking
	// until the routing tables look better, or some long timeout for the failure case.
422
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
423 424 425

	if u.Debug {
		printRoutingTables(dhts)
426
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
427 428
}

429 430
func TestProvidesMany(t *testing.T) {
	t.Skip("this test doesn't work")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
431
	// t.Skip("skipping test to debug another")
432 433 434 435 436 437 438
	ctx := context.Background()

	nDHTs := 40
	_, _, dhts := setupDHTS(ctx, nDHTs, t)
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
439
			defer dhts[i].host.Close()
440 441 442 443 444 445 446 447
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

448
	<-time.After(100 * time.Millisecond)
449
	t.Logf("bootstrapping them so they find each other. %d", nDHTs)
Jeromy's avatar
Jeromy committed
450 451
	ctxT, cancel := context.WithTimeout(ctx, 20*time.Second)
	defer cancel()
452 453
	bootstrap(t, ctxT, dhts)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
454 455 456 457 458 459 460 461
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
		t.Logf("checking routing table of %d", nDHTs)
		for _, dht := range dhts {
			fmt.Printf("checking routing table of %s\n", dht.self)
			dht.routingTable.Print()
			fmt.Println("")
		}
462
	}
463

464
	providers := make(map[string]peer.ID)
465

466
	d := 0
467
	for _, c := range testCaseCids {
468 469
		d = (d + 1) % len(dhts)
		dht := dhts[d]
470
		providers[c.KeyString()] = dht.self
471

472 473
		t.Logf("announcing provider for %s", c)
		if err := dht.Provide(ctx, c); err != nil {
474 475
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
476 477
	}

478 479
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
480

481 482
	errchan := make(chan error)

Jeromy's avatar
Jeromy committed
483 484
	ctxT, cancel = context.WithTimeout(ctx, 5*time.Second)
	defer cancel()
485 486

	var wg sync.WaitGroup
487
	getProvider := func(dht *IpfsDHT, k *cid.Cid) {
488
		defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
489

490
		expected := providers[k.KeyString()]
491

492 493 494
		provchan := dht.FindProvidersAsync(ctxT, k, 1)
		select {
		case prov := <-provchan:
495 496
			actual := prov.ID
			if actual == "" {
497
				errchan <- fmt.Errorf("Got back nil provider (%s at %s)", k, dht.self)
498 499 500
			} else if actual != expected {
				errchan <- fmt.Errorf("Got back wrong provider (%s != %s) (%s at %s)",
					expected, actual, k, dht.self)
501 502 503
			}
		case <-ctxT.Done():
			errchan <- fmt.Errorf("Did not get a provider back (%s at %s)", k, dht.self)
Jeromy's avatar
Jeromy committed
504
		}
505 506
	}

507
	for _, c := range testCaseCids {
508 509
		// everyone should be able to find it...
		for _, dht := range dhts {
510
			log.Debugf("getting providers for %s at %s", c, dht.self)
511
			wg.Add(1)
512
			go getProvider(dht, c)
513
		}
514 515 516 517 518 519 520 521 522 523
	}

	// we need this because of printing errors
	go func() {
		wg.Wait()
		close(errchan)
	}()

	for err := range errchan {
		t.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
524 525 526
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
527
func TestProvidesAsync(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
528
	// t.Skip("skipping test to debug another")
529 530 531
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
532

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
533
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
534

535
	_, _, dhts := setupDHTS(ctx, 4, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
536 537
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
538
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
539
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
540 541 542
		}
	}()

543 544 545
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
546

547
	err := dhts[3].Provide(ctx, testCaseCids[0])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
548 549 550 551 552 553
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(time.Millisecond * 60)

Jeromy's avatar
Jeromy committed
554 555
	ctxT, cancel := context.WithTimeout(ctx, time.Millisecond*300)
	defer cancel()
556
	provs := dhts[0].FindProvidersAsync(ctxT, testCaseCids[0], 5)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
557
	select {
Jeromy's avatar
Jeromy committed
558 559 560 561
	case p, ok := <-provs:
		if !ok {
			t.Fatal("Provider channel was closed...")
		}
562
		if p.ID == "" {
Jeromy's avatar
Jeromy committed
563 564
			t.Fatal("Got back nil provider!")
		}
565
		if p.ID != dhts[3].self {
566
			t.Fatalf("got a provider, but not the right one. %s", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
567
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
568
	case <-ctxT.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
569 570 571 572
		t.Fatal("Didnt get back providers")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
573
func TestLayeredGet(t *testing.T) {
574 575
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
576

577
	_, _, dhts := setupDHTS(ctx, 4, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
578 579
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
580
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
581
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
582 583 584
		}
	}()

585 586
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
587
	connect(t, ctx, dhts[2], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
588

589
	err := dhts[3].PutValue(ctx, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
590 591 592 593
	if err != nil {
		t.Fatal(err)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
594
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
595

Jeromy's avatar
Jeromy committed
596 597
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
598 599 600
	val, err := dhts[0].GetValue(ctxT, "/v/hello")
	if err != nil {
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
601
	}
602 603 604

	if string(val) != "world" {
		t.Error("got wrong value")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
605 606 607 608
	}
}

func TestFindPeer(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
609
	// t.Skip("skipping test to debug another")
610 611 612
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
613

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
614
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
615

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
616
	_, peers, dhts := setupDHTS(ctx, 4, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
617 618
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
619
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
620
			dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
621 622 623
		}
	}()

624 625 626
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
627

Jeromy's avatar
Jeromy committed
628 629
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
630
	p, err := dhts[0].FindPeer(ctxT, peers[2])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
631 632 633 634
	if err != nil {
		t.Fatal(err)
	}

635
	if p.ID == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
636 637 638
		t.Fatal("Failed to find peer.")
	}

639
	if p.ID != peers[2] {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
640 641 642
		t.Fatal("Didnt find expected peer.")
	}
}
643

644
func TestFindPeersConnectedToPeer(t *testing.T) {
645 646
	t.Skip("not quite correct (see note)")

647 648 649 650 651 652 653 654 655 656
	if testing.Short() {
		t.SkipNow()
	}

	ctx := context.Background()

	_, peers, dhts := setupDHTS(ctx, 4, t)
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
657
			dhts[i].host.Close()
658 659 660 661 662
		}
	}()

	// topology:
	// 0-1, 1-2, 1-3, 2-3
663 664 665 666
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
	connect(t, ctx, dhts[2], dhts[3])
667 668 669 670 671 672

	// fmt.Println("0 is", peers[0])
	// fmt.Println("1 is", peers[1])
	// fmt.Println("2 is", peers[2])
	// fmt.Println("3 is", peers[3])

Jeromy's avatar
Jeromy committed
673 674
	ctxT, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
675
	pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2])
676 677 678 679
	if err != nil {
		t.Fatal(err)
	}

680
	// shouldFind := []peer.ID{peers[1], peers[3]}
Jeromy's avatar
Jeromy committed
681
	var found []pstore.PeerInfo
682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697
	for nextp := range pchan {
		found = append(found, nextp)
	}

	// fmt.Printf("querying 0 (%s) FindPeersConnectedToPeer 2 (%s)\n", peers[0], peers[2])
	// fmt.Println("should find 1, 3", shouldFind)
	// fmt.Println("found", found)

	// testPeerListsMatch(t, shouldFind, found)

	log.Warning("TestFindPeersConnectedToPeer is not quite correct")
	if len(found) == 0 {
		t.Fatal("didn't find any peers.")
	}
}

698
func testPeerListsMatch(t *testing.T, p1, p2 []peer.ID) {
699 700 701 702 703 704 705 706 707

	if len(p1) != len(p2) {
		t.Fatal("did not find as many peers as should have", p1, p2)
	}

	ids1 := make([]string, len(p1))
	ids2 := make([]string, len(p2))

	for i, p := range p1 {
708
		ids1[i] = string(p)
709 710 711
	}

	for i, p := range p2 {
712
		ids2[i] = string(p)
713 714 715 716 717 718 719 720 721 722 723 724
	}

	sort.Sort(sort.StringSlice(ids1))
	sort.Sort(sort.StringSlice(ids2))

	for i := range ids1 {
		if ids1[i] != ids2[i] {
			t.Fatal("Didnt find expected peer", ids1[i], ids2)
		}
	}
}

725
func TestConnectCollision(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
726
	// t.Skip("skipping test to debug another")
727 728 729
	if testing.Short() {
		t.SkipNow()
	}
730 731 732
	if travisci.IsRunning() {
		t.Skip("Skipping on Travis-CI.")
	}
733

734
	runTimes := 10
735

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
736
	for rtime := 0; rtime < runTimes; rtime++ {
rht's avatar
rht committed
737
		log.Info("Running Time: ", rtime)
738

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
739
		ctx := context.Background()
740

741 742
		dhtA := setupDHT(ctx, t, false)
		dhtB := setupDHT(ctx, t, false)
743

744 745
		addrA := dhtA.peerstore.Addrs(dhtA.self)[0]
		addrB := dhtB.peerstore.Addrs(dhtB.self)[0]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
746

747 748
		peerA := dhtA.self
		peerB := dhtB.self
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
749

750
		errs := make(chan error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
751
		go func() {
Jeromy's avatar
Jeromy committed
752 753
			dhtA.peerstore.AddAddr(peerB, addrB, pstore.TempAddrTTL)
			pi := pstore.PeerInfo{ID: peerB}
Jeromy's avatar
Jeromy committed
754
			err := dhtA.host.Connect(ctx, pi)
755
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
756 757
		}()
		go func() {
Jeromy's avatar
Jeromy committed
758 759
			dhtB.peerstore.AddAddr(peerA, addrA, pstore.TempAddrTTL)
			pi := pstore.PeerInfo{ID: peerA}
Jeromy's avatar
Jeromy committed
760
			err := dhtB.host.Connect(ctx, pi)
761
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
762 763
		}()

764
		timeout := time.After(5 * time.Second)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
765
		select {
766 767 768 769
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
770 771 772 773
		case <-timeout:
			t.Fatal("Timeout received!")
		}
		select {
774 775 776 777
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
778 779 780 781
		case <-timeout:
			t.Fatal("Timeout received!")
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
782 783
		dhtA.Close()
		dhtB.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
784 785
		dhtA.host.Close()
		dhtB.host.Close()
Jeromy's avatar
Jeromy committed
786
	}
787
}
788 789 790 791 792

func TestBadProtoMessages(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

793
	d := setupDHT(ctx, t, false)
794 795 796 797 798 799

	nilrec := new(pb.Message)
	if _, err := d.handlePutValue(ctx, "testpeer", nilrec); err == nil {
		t.Fatal("should have errored on nil record")
	}
}
800 801 802 803 804 805 806 807 808 809

func TestClientModeConnect(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	a := setupDHT(ctx, t, false)
	b := setupDHT(ctx, t, true)

	connectNoSync(t, ctx, a, b)

810
	c := testCaseCids[0]
811
	p := peer.ID("TestPeer")
812 813
	a.providers.AddProvider(ctx, c, p)
	time.Sleep(time.Millisecond * 5) // just in case...
814

815
	provs, err := b.FindProviders(ctx, c)
816 817 818 819 820 821 822 823 824 825 826 827
	if err != nil {
		t.Fatal(err)
	}

	if len(provs) == 0 {
		t.Fatal("Expected to get a provider back")
	}

	if provs[0].ID != p {
		t.Fatal("expected it to be our test peer")
	}
}
828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860

func TestFindClosestPeers(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	nDHTs := 30
	_, _, dhts := setupDHTS(ctx, nDHTs, t)
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

	peers, err := dhts[1].GetClosestPeers(ctx, "foo")
	if err != nil {
		t.Fatal(err)
	}

	var out []peer.ID
	for p := range peers {
		out = append(out, p)
	}

	if len(out) != KValue {
		t.Fatalf("got wrong number of peers (got %d, expected %d)", len(out), KValue)
	}
}