dht_test.go 18.6 KB
Newer Older
1 2
package dht

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"bytes"
5
	"fmt"
6
	"math/rand"
7
	"sort"
8
	"sync"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	"testing"
10
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11

12
	key "github.com/ipfs/go-ipfs/blocks/key"
13 14
	routing "github.com/ipfs/go-ipfs/routing"
	record "github.com/ipfs/go-ipfs/routing/record"
15 16
	ci "github.com/ipfs/go-ipfs/thirdparty/testutil/ci"
	travisci "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis"
Jakub Sztandera's avatar
Jakub Sztandera committed
17 18
	ds "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore"
	dssync "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore/sync"
Jeromy's avatar
Jeromy committed
19

20 21
	pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore"
	peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
22
	ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr"
23
	netutil "gx/ipfs/QmZ8bCZpMWDbFSh6h2zgTYwrhnjrGM5c9WCzw72SU8p63b/go-libp2p/p2p/test/util"
Jeromy's avatar
Jeromy committed
24 25
	u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27
)

28
var testCaseValues = map[key.Key][]byte{}
29 30 31 32 33 34

func init() {
	testCaseValues["hello"] = []byte("world")
	for i := 0; i < 100; i++ {
		k := fmt.Sprintf("%d -- key", i)
		v := fmt.Sprintf("%d -- value", i)
35
		testCaseValues[key.Key(k)] = []byte(v)
36 37 38
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39 40
func setupDHT(ctx context.Context, t *testing.T) *IpfsDHT {
	h := netutil.GenHostSwarm(t, ctx)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
41

42
	dss := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43
	d := NewDHT(ctx, h, dss)
44

45
	d.Validator["v"] = &record.ValidChecker{
46
		Func: func(key.Key, []byte) error {
47 48 49
			return nil
		},
		Sign: false,
Jeromy's avatar
Jeromy committed
50
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51 52 53
	return d
}

54 55
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) {
	addrs := make([]ma.Multiaddr, n)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
56
	dhts := make([]*IpfsDHT, n)
57 58
	peers := make([]peer.ID, n)

59 60 61
	sanityAddrsMap := make(map[string]struct{})
	sanityPeersMap := make(map[string]struct{})

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
	for i := 0; i < n; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
		dhts[i] = setupDHT(ctx, t)
64
		peers[i] = dhts[i].self
65
		addrs[i] = dhts[i].peerstore.Addrs(dhts[i].self)[0]
66 67

		if _, lol := sanityAddrsMap[addrs[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
68
			t.Fatal("While setting up DHTs address got duplicated.")
69 70 71 72
		} else {
			sanityAddrsMap[addrs[i].String()] = struct{}{}
		}
		if _, lol := sanityPeersMap[peers[i].String()]; lol {
Jakub Sztandera's avatar
Jakub Sztandera committed
73
			t.Fatal("While setting up DHTs peerid got duplicated.")
74 75 76
		} else {
			sanityPeersMap[peers[i].String()] = struct{}{}
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77 78 79 80 81
	}

	return addrs, peers, dhts
}

82
func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
83

84
	idB := b.self
85
	addrB := b.peerstore.Addrs(idB)
86 87
	if len(addrB) == 0 {
		t.Fatal("peers setup incorrectly: no local address")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88
	}
89

Jeromy's avatar
Jeromy committed
90 91
	a.peerstore.AddAddrs(idB, addrB, pstore.TempAddrTTL)
	pi := pstore.PeerInfo{ID: idB}
Jeromy's avatar
Jeromy committed
92
	if err := a.host.Connect(ctx, pi); err != nil {
93
		t.Fatal(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
94
	}
95

96 97
	// loop until connection notification has been received.
	// under high load, this may not happen as immediately as we would like.
98 99 100 101 102 103 104
	for a.routingTable.Find(b.self) == "" {
		time.Sleep(time.Millisecond * 5)
	}

	for b.routingTable.Find(a.self) == "" {
		time.Sleep(time.Millisecond * 5)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105 106
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107
func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
108

109
	ctx, cancel := context.WithCancel(ctx)
Richard Littauer's avatar
Richard Littauer committed
110
	log.Debugf("Bootstrapping DHTs...")
111 112 113 114 115 116

	// tried async. sequential fares much better. compare:
	// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
	// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
	// probably because results compound

117 118 119 120
	var cfg BootstrapConfig
	cfg = DefaultBootstrapConfig
	cfg.Queries = 3

121 122 123
	start := rand.Intn(len(dhts)) // randomize to decrease bias.
	for i := range dhts {
		dht := dhts[(start+i)%len(dhts)]
124
		dht.runBootstrap(ctx, cfg)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125
	}
126
	cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127 128
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129
func TestValueGetSet(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130 131
	// t.Skip("skipping test to debug another")

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132
	ctx := context.Background()
133

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
134 135
	dhtA := setupDHT(ctx, t)
	dhtB := setupDHT(ctx, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136

137 138
	defer dhtA.Close()
	defer dhtB.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
139 140
	defer dhtA.host.Close()
	defer dhtB.host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141

142
	vf := &record.ValidChecker{
143
		Func: func(key.Key, []byte) error {
144 145 146
			return nil
		},
		Sign: false,
Jeromy's avatar
Jeromy committed
147
	}
148 149 150 151
	nulsel := func(_ key.Key, bs [][]byte) (int, error) {
		return 0, nil
	}

152 153
	dhtA.Validator["v"] = vf
	dhtB.Validator["v"] = vf
154 155
	dhtA.Selector["v"] = nulsel
	dhtB.Selector["v"] = nulsel
Jeromy's avatar
Jeromy committed
156

157
	connect(t, ctx, dhtA, dhtB)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
	ctxT, _ := context.WithTimeout(ctx, time.Second)
160
	dhtA.PutValue(ctxT, "/v/hello", []byte("world"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162
	ctxT, _ = context.WithTimeout(ctx, time.Second*2)
Jeromy's avatar
Jeromy committed
163
	val, err := dhtA.GetValue(ctxT, "/v/hello")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
164 165 166 167 168 169 170 171
	if err != nil {
		t.Fatal(err)
	}

	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
172
	ctxT, _ = context.WithTimeout(ctx, time.Second*2)
Jeromy's avatar
Jeromy committed
173
	val, err = dhtB.GetValue(ctxT, "/v/hello")
174 175 176 177 178 179 180
	if err != nil {
		t.Fatal(err)
	}

	if string(val) != "world" {
		t.Fatalf("Expected 'world' got '%s'", string(val))
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181 182
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
183 184
func TestProvides(t *testing.T) {
	// t.Skip("skipping test to debug another")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186

187
	_, _, dhts := setupDHTS(ctx, 4, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188 189
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192 193 194
		}
	}()

195 196 197
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198

199
	for k, v := range testCaseValues {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
200
		log.Debugf("adding local values for %s = %s", k, v)
Jeromy's avatar
Jeromy committed
201 202 203 204 205 206 207
		sk := dhts[3].peerstore.PrivKey(dhts[3].self)
		rec, err := record.MakePutRecord(sk, k, v, false)
		if err != nil {
			t.Fatal(err)
		}

		err = dhts[3].putLocal(k, rec)
208 209 210 211 212 213 214 215
		if err != nil {
			t.Fatal(err)
		}

		bits, err := dhts[3].getLocal(k)
		if err != nil {
			t.Fatal(err)
		}
216
		if !bytes.Equal(bits.GetValue(), v) {
217
			t.Fatalf("didn't store the right bits (%s, %s)", k, v)
218
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219 220
	}

rht's avatar
rht committed
221
	for k := range testCaseValues {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
222
		log.Debugf("announcing provider for %s", k)
223 224 225
		if err := dhts[3].Provide(ctx, k); err != nil {
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226 227
	}

228 229 230 231
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)

	n := 0
rht's avatar
rht committed
232
	for k := range testCaseValues {
233 234
		n = (n + 1) % 3

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
235
		log.Debugf("getting providers for %s from %d", k, n)
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
		ctxT, _ := context.WithTimeout(ctx, time.Second)
		provchan := dhts[n].FindProvidersAsync(ctxT, k, 1)

		select {
		case prov := <-provchan:
			if prov.ID == "" {
				t.Fatal("Got back nil provider")
			}
			if prov.ID != dhts[3].self {
				t.Fatal("Got back wrong provider")
			}
		case <-ctxT.Done():
			t.Fatal("Did not get a provider back.")
		}
	}
}

253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
	// test "well-formed-ness" (>= minPeers peers in every routing table)

	checkTables := func() bool {
		totalPeers := 0
		for _, dht := range dhts {
			rtlen := dht.routingTable.Size()
			totalPeers += rtlen
			if minPeers > 0 && rtlen < minPeers {
				t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
				return false
			}
		}
		actualAvgPeers := totalPeers / len(dhts)
		t.Logf("avg rt size: %d", actualAvgPeers)
		if avgPeers > 0 && actualAvgPeers < avgPeers {
			t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers)
			return false
		}
		return true
	}

	timeoutA := time.After(timeout)
	for {
		select {
		case <-timeoutA:
280
			log.Debugf("did not reach well-formed routing tables by %s", timeout)
281 282 283 284 285 286 287 288 289 290 291
			return false // failed
		case <-time.After(5 * time.Millisecond):
			if checkTables() {
				return true // succeeded
			}
		}
	}
}

func printRoutingTables(dhts []*IpfsDHT) {
	// the routing tables should be full now. let's inspect them.
292
	fmt.Printf("checking routing table of %d\n", len(dhts))
293 294 295 296 297 298 299
	for _, dht := range dhts {
		fmt.Printf("checking routing table of %s\n", dht.self)
		dht.routingTable.Print()
		fmt.Println("")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
300
func TestBootstrap(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
301
	// t.Skip("skipping test to debug another")
302 303 304 305
	if testing.Short() {
		t.SkipNow()
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
306 307
	ctx := context.Background()

308
	nDHTs := 30
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
309 310 311 312
	_, _, dhts := setupDHTS(ctx, nDHTs, t)
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
313
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
314 315 316 317 318 319 320 321
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

322
	<-time.After(100 * time.Millisecond)
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
	// bootstrap a few times until we get good tables.
	stop := make(chan struct{})
	go func() {
		for {
			t.Logf("bootstrapping them so they find each other", nDHTs)
			ctxT, _ := context.WithTimeout(ctx, 5*time.Second)
			bootstrap(t, ctxT, dhts)

			select {
			case <-time.After(50 * time.Millisecond):
				continue // being explicit
			case <-stop:
				return
			}
		}
	}()

340
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
341
	close(stop)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
342

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
343 344
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
345 346 347 348 349 350
		printRoutingTables(dhts)
	}
}

func TestPeriodicBootstrap(t *testing.T) {
	// t.Skip("skipping test to debug another")
351 352 353
	if ci.IsRunning() {
		t.Skip("skipping on CI. highly timing dependent")
	}
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
	if testing.Short() {
		t.SkipNow()
	}

	ctx := context.Background()

	nDHTs := 30
	_, _, dhts := setupDHTS(ctx, nDHTs, t)
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
			defer dhts[i].host.Close()
		}
	}()

	// signal amplifier
	amplify := func(signal chan time.Time, other []chan time.Time) {
		for t := range signal {
			for _, s := range other {
				s <- t
			}
		}
		for _, s := range other {
			close(s)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
378 379 380
		}
	}

381 382 383
	signal := make(chan time.Time)
	allSignals := []chan time.Time{}

384 385 386 387
	var cfg BootstrapConfig
	cfg = DefaultBootstrapConfig
	cfg.Queries = 5

388 389 390 391
	// kick off periodic bootstrappers with instrumented signals.
	for _, dht := range dhts {
		s := make(chan time.Time)
		allSignals = append(allSignals, s)
392
		dht.BootstrapOnSignal(cfg, s)
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
	}
	go amplify(signal, allSignals)

	t.Logf("dhts are not connected.", nDHTs)
	for _, dht := range dhts {
		rtlen := dht.routingTable.Size()
		if rtlen > 0 {
			t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen)
		}
	}

	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

Richard Littauer's avatar
Richard Littauer committed
408
	t.Logf("DHTs are now connected to 1-2 others.", nDHTs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
409
	for _, dht := range dhts {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
410
		rtlen := dht.routingTable.Size()
411 412
		if rtlen > 2 {
			t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
413
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
414
	}
415

416 417 418 419 420 421 422 423 424
	if u.Debug {
		printRoutingTables(dhts)
	}

	t.Logf("bootstrapping them so they find each other", nDHTs)
	signal <- time.Now()

	// this is async, and we dont know when it's finished with one cycle, so keep checking
	// until the routing tables look better, or some long timeout for the failure case.
425
	waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
426 427 428

	if u.Debug {
		printRoutingTables(dhts)
429
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
430 431
}

432 433
func TestProvidesMany(t *testing.T) {
	t.Skip("this test doesn't work")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
434
	// t.Skip("skipping test to debug another")
435 436 437 438 439 440 441
	ctx := context.Background()

	nDHTs := 40
	_, _, dhts := setupDHTS(ctx, nDHTs, t)
	defer func() {
		for i := 0; i < nDHTs; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
442
			defer dhts[i].host.Close()
443 444 445 446 447 448 449 450
		}
	}()

	t.Logf("connecting %d dhts in a ring", nDHTs)
	for i := 0; i < nDHTs; i++ {
		connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
	}

451
	<-time.After(100 * time.Millisecond)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
452
	t.Logf("bootstrapping them so they find each other", nDHTs)
453
	ctxT, _ := context.WithTimeout(ctx, 20*time.Second)
454 455
	bootstrap(t, ctxT, dhts)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
456 457 458 459 460 461 462 463
	if u.Debug {
		// the routing tables should be full now. let's inspect them.
		t.Logf("checking routing table of %d", nDHTs)
		for _, dht := range dhts {
			fmt.Printf("checking routing table of %s\n", dht.self)
			dht.routingTable.Print()
			fmt.Println("")
		}
464
	}
465

466
	var providers = map[key.Key]peer.ID{}
467

468 469 470 471
	d := 0
	for k, v := range testCaseValues {
		d = (d + 1) % len(dhts)
		dht := dhts[d]
472
		providers[k] = dht.self
473 474

		t.Logf("adding local values for %s = %s (on %s)", k, v, dht.self)
Jeromy's avatar
Jeromy committed
475 476 477 478 479 480
		rec, err := record.MakePutRecord(nil, k, v, false)
		if err != nil {
			t.Fatal(err)
		}

		err = dht.putLocal(k, rec)
481 482 483 484 485 486 487 488
		if err != nil {
			t.Fatal(err)
		}

		bits, err := dht.getLocal(k)
		if err != nil {
			t.Fatal(err)
		}
489
		if !bytes.Equal(bits.GetValue(), v) {
490
			t.Fatalf("didn't store the right bits (%s, %s)", k, v)
491 492 493 494 495 496
		}

		t.Logf("announcing provider for %s", k)
		if err := dht.Provide(ctx, k); err != nil {
			t.Fatal(err)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
497 498
	}

499 500
	// what is this timeout for? was 60ms before.
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
501

502 503
	errchan := make(chan error)

504
	ctxT, _ = context.WithTimeout(ctx, 5*time.Second)
505 506

	var wg sync.WaitGroup
507
	getProvider := func(dht *IpfsDHT, k key.Key) {
508
		defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
509

510 511
		expected := providers[k]

512 513 514
		provchan := dht.FindProvidersAsync(ctxT, k, 1)
		select {
		case prov := <-provchan:
515 516
			actual := prov.ID
			if actual == "" {
517
				errchan <- fmt.Errorf("Got back nil provider (%s at %s)", k, dht.self)
518 519 520
			} else if actual != expected {
				errchan <- fmt.Errorf("Got back wrong provider (%s != %s) (%s at %s)",
					expected, actual, k, dht.self)
521 522 523
			}
		case <-ctxT.Done():
			errchan <- fmt.Errorf("Did not get a provider back (%s at %s)", k, dht.self)
Jeromy's avatar
Jeromy committed
524
		}
525 526
	}

rht's avatar
rht committed
527
	for k := range testCaseValues {
528 529
		// everyone should be able to find it...
		for _, dht := range dhts {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
530
			log.Debugf("getting providers for %s at %s", k, dht.self)
531 532
			wg.Add(1)
			go getProvider(dht, k)
533
		}
534 535 536 537 538 539 540 541 542 543
	}

	// we need this because of printing errors
	go func() {
		wg.Wait()
		close(errchan)
	}()

	for err := range errchan {
		t.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
544 545 546
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
547
func TestProvidesAsync(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
548
	// t.Skip("skipping test to debug another")
549 550 551
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
552

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
553
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
554

555
	_, _, dhts := setupDHTS(ctx, 4, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
556 557
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
558
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
559
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
560 561 562
		}
	}()

563 564 565
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
566

567
	k := key.Key("hello")
Jeromy's avatar
Jeromy committed
568 569 570 571 572 573 574 575
	val := []byte("world")
	sk := dhts[3].peerstore.PrivKey(dhts[3].self)
	rec, err := record.MakePutRecord(sk, k, val, false)
	if err != nil {
		t.Fatal(err)
	}

	err = dhts[3].putLocal(k, rec)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
576 577 578 579
	if err != nil {
		t.Fatal(err)
	}

Jeromy's avatar
Jeromy committed
580
	bits, err := dhts[3].getLocal(k)
581
	if err != nil && bytes.Equal(bits.GetValue(), val) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
582 583 584
		t.Fatal(err)
	}

585
	err = dhts[3].Provide(ctx, key.Key("hello"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
586 587 588 589 590 591
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(time.Millisecond * 60)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
592
	ctxT, _ := context.WithTimeout(ctx, time.Millisecond*300)
593
	provs := dhts[0].FindProvidersAsync(ctxT, key.Key("hello"), 5)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
594
	select {
Jeromy's avatar
Jeromy committed
595 596 597 598
	case p, ok := <-provs:
		if !ok {
			t.Fatal("Provider channel was closed...")
		}
599
		if p.ID == "" {
Jeromy's avatar
Jeromy committed
600 601
			t.Fatal("Got back nil provider!")
		}
602
		if p.ID != dhts[3].self {
603
			t.Fatalf("got a provider, but not the right one. %s", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
604
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
605
	case <-ctxT.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
606 607 608 609
		t.Fatal("Didnt get back providers")
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
610
func TestLayeredGet(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
611
	// t.Skip("skipping test to debug another")
612 613 614
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
615

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
616
	ctx := context.Background()
617

618
	_, _, dhts := setupDHTS(ctx, 4, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
619 620
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
621
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
622
			defer dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
623 624 625
		}
	}()

626 627 628
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
629

630
	err := dhts[3].Provide(ctx, key.Key("/v/hello"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
631 632 633 634
	if err != nil {
		t.Fatal(err)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
635
	time.Sleep(time.Millisecond * 6)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
636

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
637
	t.Log("interface was changed. GetValue should not use providers.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
638
	ctxT, _ := context.WithTimeout(ctx, time.Second)
639
	val, err := dhts[0].GetValue(ctxT, key.Key("/v/hello"))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
640 641
	if err != routing.ErrNotFound {
		t.Error(err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
642
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
643 644 645 646 647
	if string(val) == "world" {
		t.Error("should not get value.")
	}
	if len(val) > 0 && string(val) != "world" {
		t.Error("worse, there's a value and its not even the right one.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
648 649 650 651
	}
}

func TestFindPeer(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
652
	// t.Skip("skipping test to debug another")
653 654 655
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
656

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
657
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
658

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
659
	_, peers, dhts := setupDHTS(ctx, 4, t)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
660 661
	defer func() {
		for i := 0; i < 4; i++ {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
662
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
663
			dhts[i].host.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
664 665 666
		}
	}()

667 668 669
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
670

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
671
	ctxT, _ := context.WithTimeout(ctx, time.Second)
672
	p, err := dhts[0].FindPeer(ctxT, peers[2])
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
673 674 675 676
	if err != nil {
		t.Fatal(err)
	}

677
	if p.ID == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
678 679 680
		t.Fatal("Failed to find peer.")
	}

681
	if p.ID != peers[2] {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
682 683 684
		t.Fatal("Didnt find expected peer.")
	}
}
685

686
func TestFindPeersConnectedToPeer(t *testing.T) {
687 688
	t.Skip("not quite correct (see note)")

689 690 691 692 693 694 695 696 697 698
	if testing.Short() {
		t.SkipNow()
	}

	ctx := context.Background()

	_, peers, dhts := setupDHTS(ctx, 4, t)
	defer func() {
		for i := 0; i < 4; i++ {
			dhts[i].Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
699
			dhts[i].host.Close()
700 701 702 703 704
		}
	}()

	// topology:
	// 0-1, 1-2, 1-3, 2-3
705 706 707 708
	connect(t, ctx, dhts[0], dhts[1])
	connect(t, ctx, dhts[1], dhts[2])
	connect(t, ctx, dhts[1], dhts[3])
	connect(t, ctx, dhts[2], dhts[3])
709 710 711 712 713 714 715

	// fmt.Println("0 is", peers[0])
	// fmt.Println("1 is", peers[1])
	// fmt.Println("2 is", peers[2])
	// fmt.Println("3 is", peers[3])

	ctxT, _ := context.WithTimeout(ctx, time.Second)
716
	pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2])
717 718 719 720
	if err != nil {
		t.Fatal(err)
	}

721
	// shouldFind := []peer.ID{peers[1], peers[3]}
Jeromy's avatar
Jeromy committed
722
	var found []pstore.PeerInfo
723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
	for nextp := range pchan {
		found = append(found, nextp)
	}

	// fmt.Printf("querying 0 (%s) FindPeersConnectedToPeer 2 (%s)\n", peers[0], peers[2])
	// fmt.Println("should find 1, 3", shouldFind)
	// fmt.Println("found", found)

	// testPeerListsMatch(t, shouldFind, found)

	log.Warning("TestFindPeersConnectedToPeer is not quite correct")
	if len(found) == 0 {
		t.Fatal("didn't find any peers.")
	}
}

739
func testPeerListsMatch(t *testing.T, p1, p2 []peer.ID) {
740 741 742 743 744 745 746 747 748

	if len(p1) != len(p2) {
		t.Fatal("did not find as many peers as should have", p1, p2)
	}

	ids1 := make([]string, len(p1))
	ids2 := make([]string, len(p2))

	for i, p := range p1 {
749
		ids1[i] = string(p)
750 751 752
	}

	for i, p := range p2 {
753
		ids2[i] = string(p)
754 755 756 757 758 759 760 761 762 763 764 765
	}

	sort.Sort(sort.StringSlice(ids1))
	sort.Sort(sort.StringSlice(ids2))

	for i := range ids1 {
		if ids1[i] != ids2[i] {
			t.Fatal("Didnt find expected peer", ids1[i], ids2)
		}
	}
}

766
func TestConnectCollision(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
767
	// t.Skip("skipping test to debug another")
768 769 770
	if testing.Short() {
		t.SkipNow()
	}
771 772 773
	if travisci.IsRunning() {
		t.Skip("Skipping on Travis-CI.")
	}
774

775
	runTimes := 10
776

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
777
	for rtime := 0; rtime < runTimes; rtime++ {
rht's avatar
rht committed
778
		log.Info("Running Time: ", rtime)
779

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
780
		ctx := context.Background()
781

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
782 783
		dhtA := setupDHT(ctx, t)
		dhtB := setupDHT(ctx, t)
784

785 786
		addrA := dhtA.peerstore.Addrs(dhtA.self)[0]
		addrB := dhtB.peerstore.Addrs(dhtB.self)[0]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
787

788 789
		peerA := dhtA.self
		peerB := dhtB.self
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
790

791
		errs := make(chan error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
792
		go func() {
Jeromy's avatar
Jeromy committed
793 794
			dhtA.peerstore.AddAddr(peerB, addrB, pstore.TempAddrTTL)
			pi := pstore.PeerInfo{ID: peerB}
Jeromy's avatar
Jeromy committed
795
			err := dhtA.host.Connect(ctx, pi)
796
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
797 798
		}()
		go func() {
Jeromy's avatar
Jeromy committed
799 800
			dhtB.peerstore.AddAddr(peerA, addrA, pstore.TempAddrTTL)
			pi := pstore.PeerInfo{ID: peerA}
Jeromy's avatar
Jeromy committed
801
			err := dhtB.host.Connect(ctx, pi)
802
			errs <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
803 804
		}()

805
		timeout := time.After(5 * time.Second)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
806
		select {
807 808 809 810
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
811 812 813 814
		case <-timeout:
			t.Fatal("Timeout received!")
		}
		select {
815 816 817 818
		case e := <-errs:
			if e != nil {
				t.Fatal(e)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
819 820 821 822
		case <-timeout:
			t.Fatal("Timeout received!")
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
823 824
		dhtA.Close()
		dhtB.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
825 826
		dhtA.host.Close()
		dhtB.host.Close()
Jeromy's avatar
Jeromy committed
827
	}
828
}