bitswap_test.go 20.6 KB
Newer Older
1
package bitswap_test
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"context"
Jeromy's avatar
Jeromy committed
6
	"fmt"
7
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
8 9 10
	"testing"
	"time"

11
	bitswap "github.com/ipfs/go-bitswap"
12 13 14 15
	decision "github.com/ipfs/go-bitswap/internal/decision"
	bssession "github.com/ipfs/go-bitswap/internal/session"
	testinstance "github.com/ipfs/go-bitswap/internal/testinstance"
	tn "github.com/ipfs/go-bitswap/internal/testnet"
16
	"github.com/ipfs/go-bitswap/message"
Jeromy's avatar
Jeromy committed
17 18 19 20 21 22 23
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	detectrace "github.com/ipfs/go-detect-race"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
24
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
25
	p2ptestutil "github.com/libp2p/go-libp2p-netutil"
26 27
	travis "github.com/libp2p/go-libp2p-testing/ci/travis"
	tu "github.com/libp2p/go-libp2p-testing/etc"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
28 29
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
30 31
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
32 33
const kNetworkDelay = 0 * time.Millisecond

34 35 36 37
func getVirtualNetwork() tn.Network {
	return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

38
func TestClose(t *testing.T) {
39
	vnet := getVirtualNetwork()
40
	ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
41
	defer ig.Close()
42
	bgen := blocksutil.NewBlockGenerator()
43 44

	block := bgen.Next()
45
	bitswap := ig.Next()
46

47
	bitswap.Exchange.Close()
Steven Allen's avatar
Steven Allen committed
48 49 50 51
	_, err := bitswap.Exchange.GetBlock(context.Background(), block.Cid())
	if err == nil {
		t.Fatal("expected GetBlock to fail")
	}
52 53
}

Jeromy's avatar
Jeromy committed
54 55 56 57
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
58
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
59
	defer ig.Close()
Jeromy's avatar
Jeromy committed
60 61 62

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
Steven Allen's avatar
Steven Allen committed
63 64 65 66
	err := rs.Client(pinfo).Provide(context.Background(), block.Cid(), true) // but not on network
	if err != nil {
		t.Fatal(err)
	}
Jeromy's avatar
Jeromy committed
67

68
	solo := ig.Next()
Jeromy's avatar
Jeromy committed
69 70
	defer solo.Exchange.Close()

rht's avatar
rht committed
71 72
	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
	defer cancel()
Steven Allen's avatar
Steven Allen committed
73
	_, err = solo.Exchange.GetBlock(ctx, block.Cid())
Jeromy's avatar
Jeromy committed
74 75 76 77 78 79

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
80 81
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

82
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
83
	block := blocks.NewBlock([]byte("block"))
84
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
85
	defer ig.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
86

87
	peers := ig.Instances(2)
88
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
89
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90

91
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
92 93
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94

95
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
96
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
97

rht's avatar
rht committed
98 99
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
100
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Cid())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
101 102 103 104
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
105

Jeromy's avatar
Jeromy committed
106
	if !bytes.Equal(block.RawData(), received.RawData()) {
107 108
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
109 110
}

111 112 113
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	block := blocks.NewBlock([]byte("block"))
114 115
	bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)}
	ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
116
	defer ig.Close()
117

118
	hasBlock := ig.Next()
119 120
	defer hasBlock.Exchange.Close()

121 122 123
	wantsBlock := ig.Next()
	defer wantsBlock.Exchange.Close()

124 125 126 127
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
		t.Fatal(err)
	}

128
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond)
129 130 131 132 133 134 135 136 137 138 139 140 141 142
	defer cancel()

	ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session)

	received, err := ns.GetBlock(ctx, block.Cid())
	if received != nil {
		t.Fatalf("Expected to find nothing, found %s", received)
	}

	if err != context.DeadlineExceeded {
		t.Fatal("Expected deadline exceeded")
	}
}

143 144
// Tests that a received block is not stored in the blockstore if the block was
// not requested by the client
145 146 147 148 149 150 151
func TestUnwantedBlockNotAdded(t *testing.T) {

	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	block := blocks.NewBlock([]byte("block"))
	bsMessage := message.New(true)
	bsMessage.AddBlock(block)

152
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
153
	defer ig.Close()
154

155
	peers := ig.Instances(2)
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
	hasBlock := peers[0]
	defer hasBlock.Exchange.Close()

	if err := hasBlock.Exchange.HasBlock(block); err != nil {
		t.Fatal(err)
	}

	doesNotWantBlock := peers[1]
	defer doesNotWantBlock.Exchange.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Peer, bsMessage)

171
	blockInStore, err := doesNotWantBlock.Blockstore().Has(block.Cid())
172 173 174 175 176
	if err != nil || blockInStore {
		t.Fatal("Unwanted block added to block store")
	}
}

177 178 179 180 181 182 183 184 185 186 187
// Tests that a received block is returned to the client and stored in the
// blockstore in the following scenario:
// - the want for the block has been requested by the client
// - the want for the block has not yet been sent out to a peer
//   (because the live request queue is full)
func TestPendingBlockAdded(t *testing.T) {
	ctx := context.Background()
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	bg := blocksutil.NewBlockGenerator()
	sessionBroadcastWantCapacity := 4

188
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
	defer ig.Close()

	instance := ig.Instances(1)[0]
	defer instance.Exchange.Close()

	oneSecCtx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	// Request enough blocks to exceed the session's broadcast want list
	// capacity (by one block). The session will put the remaining block
	// into the "tofetch" queue
	blks := bg.Blocks(sessionBroadcastWantCapacity + 1)
	ks := make([]cid.Cid, 0, len(blks))
	for _, b := range blks {
		ks = append(ks, b.Cid())
	}
	outch, err := instance.Exchange.GetBlocks(ctx, ks)
	if err != nil {
		t.Fatal(err)
	}

	// Wait a little while to make sure the session has time to process the wants
	time.Sleep(time.Millisecond * 20)

	// Simulate receiving a message which contains the block in the "tofetch" queue
	lastBlock := blks[len(blks)-1]
	bsMessage := message.New(true)
	bsMessage.AddBlock(lastBlock)
	unknownPeer := peer.ID("QmUHfvCQrzyR6vFXmeyCptfCWedfcmfa12V6UuziDtrw23")
	instance.Exchange.ReceiveMessage(oneSecCtx, unknownPeer, bsMessage)

	// Make sure Bitswap adds the block to the output channel
	blkrecvd, ok := <-outch
	if !ok {
		t.Fatal("timed out waiting for block")
	}
	if !blkrecvd.Cid().Equals(lastBlock.Cid()) {
		t.Fatal("received wrong block")
	}

	// Make sure Bitswap adds the block to the blockstore
	blockInStore, err := instance.Blockstore().Has(lastBlock.Cid())
	if err != nil {
		t.Fatal(err)
	}
	if !blockInStore {
		t.Fatal("Block was not added to block store")
	}
}

239
func TestLargeSwarm(t *testing.T) {
240 241 242
	if testing.Short() {
		t.SkipNow()
	}
243
	numInstances := 100
244
	numBlocks := 2
245 246 247
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
248
		numInstances = 20
249 250
	} else if travis.IsRunning() {
		numInstances = 200
251 252 253
	} else {
		t.Parallel()
	}
254 255
	PerformDistributionTest(t, numInstances, numBlocks)
}
256

257 258 259
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
260
	}
261 262 263 264 265

	if !travis.IsRunning() {
		t.Parallel()
	}

266 267 268
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
269 270
}

271 272 273 274 275 276 277 278 279
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

280
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
281
	ctx := context.Background()
282 283 284
	if testing.Short() {
		t.SkipNow()
	}
285
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
286
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
287
	defer ig.Close()
288
	bg := blocksutil.NewBlockGenerator()
289

290
	instances := ig.Instances(numInstances)
291 292 293 294
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

295
	var blkeys []cid.Cid
296 297
	first := instances[0]
	for _, b := range blocks {
298
		blkeys = append(blkeys, b.Cid())
Steven Allen's avatar
Steven Allen committed
299 300 301 302
		err := first.Exchange.HasBlock(b)
		if err != nil {
			t.Fatal(err)
		}
303 304 305 306
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
307
	wg := sync.WaitGroup{}
308 309
	errs := make(chan error)

310
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
311
		wg.Add(1)
312
		go func(inst testinstance.Instance) {
Jeromy's avatar
Jeromy committed
313
			defer wg.Done()
314
			outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
Jeromy's avatar
Jeromy committed
315
			if err != nil {
316
				errs <- err
Jeromy's avatar
Jeromy committed
317
			}
318
			for range outch {
Jeromy's avatar
Jeromy committed
319 320
			}
		}(inst)
321
	}
322 323 324 325 326 327 328 329 330 331 332

	go func() {
		wg.Wait()
		close(errs)
	}()

	for err := range errs {
		if err != nil {
			t.Fatal(err)
		}
	}
333 334 335 336 337

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
338
			if _, err := inst.Blockstore().Get(b.Cid()); err != nil {
339 340 341 342 343 344
				t.Fatal(err)
			}
		}
	}
}

345
// TODO simplify this test. get to the _essence_!
346
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
347 348 349 350
	if testing.Short() {
		t.SkipNow()
	}

351
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
352
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
353
	defer ig.Close()
354
	bg := blocksutil.NewBlockGenerator()
355

356
	peers := ig.Instances(2)
357 358
	peerA := peers[0]
	peerB := peers[1]
359

360 361
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
362

363
	waitTime := time.Second * 5
364

365 366
	alpha := bg.Next()
	// peerA requests and waits for block alpha
367
	ctx, cancel := context.WithTimeout(context.Background(), waitTime)
rht's avatar
rht committed
368
	defer cancel()
369
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []cid.Cid{alpha.Cid()})
370
	if err != nil {
371 372
		t.Fatal(err)
	}
373

374
	// peerB announces to the network that he has block alpha
375
	err = peerB.Exchange.HasBlock(alpha)
376
	if err != nil {
377 378
		t.Fatal(err)
	}
379

380 381 382 383
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
384
	}
385

386
	if !blkrecvd.Cid().Equals(alpha.Cid()) {
387
		t.Fatal("Wrong block!")
388
	}
389

390
}
Jeromy's avatar
Jeromy committed
391

jbenet's avatar
jbenet committed
392 393
func TestEmptyKey(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
394
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
395 396
	defer ig.Close()
	bs := ig.Instances(1)[0].Exchange
jbenet's avatar
jbenet committed
397

Jeromy's avatar
Jeromy committed
398 399 400
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

401
	_, err := bs.GetBlock(ctx, cid.Cid{})
jbenet's avatar
jbenet committed
402 403 404 405 406
	if err != blockstore.ErrNotFound {
		t.Error("empty str key should return ErrNotFound")
	}
}

407
func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint64) {
Jeromy's avatar
Jeromy committed
408
	if sblks != st.BlocksSent {
Steven Allen's avatar
Steven Allen committed
409
		t.Errorf("mismatch in blocks sent: %d vs %d", sblks, st.BlocksSent)
Jeromy's avatar
Jeromy committed
410 411 412
	}

	if rblks != st.BlocksReceived {
Steven Allen's avatar
Steven Allen committed
413
		t.Errorf("mismatch in blocks recvd: %d vs %d", rblks, st.BlocksReceived)
Jeromy's avatar
Jeromy committed
414 415 416
	}

	if sdata != st.DataSent {
Steven Allen's avatar
Steven Allen committed
417
		t.Errorf("mismatch in data sent: %d vs %d", sdata, st.DataSent)
Jeromy's avatar
Jeromy committed
418 419 420
	}

	if rdata != st.DataReceived {
Steven Allen's avatar
Steven Allen committed
421
		t.Errorf("mismatch in data recvd: %d vs %d", rdata, st.DataReceived)
Jeromy's avatar
Jeromy committed
422 423 424
	}
}

Jeromy's avatar
Jeromy committed
425
func TestBasicBitswap(t *testing.T) {
426
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
427
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
428
	defer ig.Close()
Jeromy's avatar
Jeromy committed
429 430
	bg := blocksutil.NewBlockGenerator()

431
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
432

433
	instances := ig.Instances(3)
Jeromy's avatar
Jeromy committed
434
	blocks := bg.Blocks(1)
435 436

	// First peer has block
437
	err := instances[0].Exchange.HasBlock(blocks[0])
Jeromy's avatar
Jeromy committed
438 439 440 441
	if err != nil {
		t.Fatal(err)
	}

442
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
rht's avatar
rht committed
443
	defer cancel()
444 445 446

	// Second peer broadcasts want for block CID
	// (Received by first and third peers)
447
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
Jeromy's avatar
Jeromy committed
448 449 450 451
	if err != nil {
		t.Fatal(err)
	}

452 453
	// When second peer receives block, it should send out a cancel, so third
	// peer should no longer keep second peer's want
454 455 456 457 458 459 460 461 462 463
	if err = tu.WaitFor(ctx, func() error {
		if len(instances[2].Exchange.WantlistForPeer(instances[1].Peer)) != 0 {
			return fmt.Errorf("should have no items in other peers wantlist")
		}
		if len(instances[1].Exchange.GetWantlist()) != 0 {
			return fmt.Errorf("shouldnt have anything in wantlist")
		}
		return nil
	}); err != nil {
		t.Fatal(err)
Jeromy's avatar
Jeromy committed
464 465
	}

Jeromy's avatar
Jeromy committed
466 467 468 469 470 471 472 473 474 475
	st0, err := instances[0].Exchange.Stat()
	if err != nil {
		t.Fatal(err)
	}

	st1, err := instances[1].Exchange.Stat()
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
476 477
	st2, err := instances[2].Exchange.Stat()
	if err != nil {
Jeromy's avatar
Jeromy committed
478 479 480
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
481 482 483 484 485 486 487 488 489
	t.Log("stat node 0")
	assertStat(t, st0, 1, 0, uint64(len(blk.RawData())), 0)
	t.Log("stat node 1")
	assertStat(t, st1, 0, 1, 0, uint64(len(blk.RawData())))
	t.Log("stat node 2")
	assertStat(t, st2, 0, 0, 0, 0)

	if !bytes.Equal(blk.RawData(), blocks[0].RawData()) {
		t.Errorf("blocks aren't equal: expected %v, actual %v", blocks[0].RawData(), blk.RawData())
Jeromy's avatar
Jeromy committed
490 491
	}

Jeromy's avatar
Jeromy committed
492 493 494 495 496 497 498 499
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
Jeromy's avatar
Jeromy committed
500 501 502

func TestDoubleGet(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
503
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
504
	defer ig.Close()
Jeromy's avatar
Jeromy committed
505 506 507 508
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a one node trying to get one block from another")

509
	instances := ig.Instances(2)
Jeromy's avatar
Jeromy committed
510 511
	blocks := bg.Blocks(1)

Jeromy's avatar
Jeromy committed
512 513 514
	// NOTE: A race condition can happen here where these GetBlocks requests go
	// through before the peers even get connected. This is okay, bitswap
	// *should* be able to handle this.
Jeromy's avatar
Jeromy committed
515
	ctx1, cancel1 := context.WithCancel(context.Background())
516
	blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []cid.Cid{blocks[0].Cid()})
Jeromy's avatar
Jeromy committed
517 518 519 520 521 522 523
	if err != nil {
		t.Fatal(err)
	}

	ctx2, cancel2 := context.WithCancel(context.Background())
	defer cancel2()

524
	blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []cid.Cid{blocks[0].Cid()})
Jeromy's avatar
Jeromy committed
525 526 527 528
	if err != nil {
		t.Fatal(err)
	}

529
	// ensure both requests make it into the wantlist at the same time
Jeromy's avatar
Jeromy committed
530
	time.Sleep(time.Millisecond * 20)
Jeromy's avatar
Jeromy committed
531 532 533 534 535 536 537 538 539 540 541 542
	cancel1()

	_, ok := <-blkch1
	if ok {
		t.Fatal("expected channel to be closed")
	}

	err = instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

543 544 545 546 547 548 549
	select {
	case blk, ok := <-blkch2:
		if !ok {
			t.Fatal("expected to get the block here")
		}
		t.Log(blk)
	case <-time.After(time.Second * 5):
Jeromy's avatar
Jeromy committed
550 551 552 553 554 555 556 557
		p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Peer)
		if len(p1wl) != 1 {
			t.Logf("wantlist view didnt have 1 item (had %d)", len(p1wl))
		} else if !p1wl[0].Equals(blocks[0].Cid()) {
			t.Logf("had 1 item, it was wrong: %s %s", blocks[0].Cid(), p1wl[0])
		} else {
			t.Log("had correct wantlist, somehow")
		}
558
		t.Fatal("timed out waiting on block")
Jeromy's avatar
Jeromy committed
559 560 561 562 563 564 565 566 567
	}

	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
568 569 570

func TestWantlistCleanup(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
571
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
572
	defer ig.Close()
573 574
	bg := blocksutil.NewBlockGenerator()

dirkmc's avatar
dirkmc committed
575 576 577
	instances := ig.Instances(2)
	instance := instances[0]
	bswap := instance.Exchange
578 579
	blocks := bg.Blocks(20)

580
	var keys []cid.Cid
581
	for _, b := range blocks {
582
		keys = append(keys, b.Cid())
583 584
	}

dirkmc's avatar
dirkmc committed
585
	// Once context times out, key should be removed from wantlist
586 587 588 589 590 591 592 593 594
	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err := bswap.GetBlock(ctx, keys[0])
	if err != context.DeadlineExceeded {
		t.Fatal("shouldnt have fetched any blocks")
	}

	time.Sleep(time.Millisecond * 50)

dirkmc's avatar
dirkmc committed
595
	if len(bswap.GetWantHaves()) > 0 {
596 597 598
		t.Fatal("should not have anyting in wantlist")
	}

dirkmc's avatar
dirkmc committed
599
	// Once context times out, keys should be removed from wantlist
600 601 602 603 604 605 606 607 608 609
	ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err = bswap.GetBlocks(ctx, keys[:10])
	if err != nil {
		t.Fatal(err)
	}

	<-ctx.Done()
	time.Sleep(time.Millisecond * 50)

dirkmc's avatar
dirkmc committed
610
	if len(bswap.GetWantHaves()) > 0 {
611 612 613
		t.Fatal("should not have anyting in wantlist")
	}

dirkmc's avatar
dirkmc committed
614
	// Send want for single block, with no timeout
615 616 617 618 619
	_, err = bswap.GetBlocks(context.Background(), keys[:1])
	if err != nil {
		t.Fatal(err)
	}

dirkmc's avatar
dirkmc committed
620
	// Send want for 10 blocks
621 622 623 624 625 626
	ctx, cancel = context.WithCancel(context.Background())
	_, err = bswap.GetBlocks(ctx, keys[10:])
	if err != nil {
		t.Fatal(err)
	}

dirkmc's avatar
dirkmc committed
627 628
	// Even after 50 milli-seconds we haven't explicitly cancelled anything
	// and no timeouts have expired, so we should have 11 want-haves
629
	time.Sleep(time.Millisecond * 50)
dirkmc's avatar
dirkmc committed
630 631
	if len(bswap.GetWantHaves()) != 11 {
		t.Fatal("should have 11 keys in wantlist")
632 633
	}

dirkmc's avatar
dirkmc committed
634 635
	// Cancel the timeout for the request for 10 blocks. This should remove
	// the want-haves
636
	cancel()
dirkmc's avatar
dirkmc committed
637 638

	// Once the cancel is processed, we are left with the request for 1 block
639
	time.Sleep(time.Millisecond * 50)
dirkmc's avatar
dirkmc committed
640
	if !(len(bswap.GetWantHaves()) == 1 && bswap.GetWantHaves()[0] == keys[0]) {
641 642 643
		t.Fatal("should only have keys[0] in wantlist")
	}
}
644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660

func assertLedgerMatch(ra, rb *decision.Receipt) error {
	if ra.Sent != rb.Recv {
		return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d sent vs %d recvd", ra.Sent, rb.Recv)
	}

	if ra.Recv != rb.Sent {
		return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d recvd vs %d sent", ra.Recv, rb.Sent)
	}

	if ra.Exchanged != rb.Exchanged {
		return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged)
	}

	return nil
}

661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
func assertLedgerEqual(ra, rb *decision.Receipt) error {
	if ra.Value != rb.Value {
		return fmt.Errorf("mismatch in ledgers (value/debt ratio): %f vs %f ", ra.Value, rb.Value)
	}

	if ra.Sent != rb.Sent {
		return fmt.Errorf("mismatch in ledgers (sent bytes): %d vs %d", ra.Sent, rb.Sent)
	}

	if ra.Recv != rb.Recv {
		return fmt.Errorf("mismatch in ledgers (recvd bytes): %d vs %d", ra.Recv, rb.Recv)
	}

	if ra.Exchanged != rb.Exchanged {
		return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged)
	}

	return nil
}

func newReceipt(sent, recv, exchanged uint64) *decision.Receipt {
	return &decision.Receipt{
		Peer:      "test",
		Value:     float64(sent) / (1 + float64(recv)),
		Sent:      sent,
		Recv:      recv,
		Exchanged: exchanged,
	}
}

func TestBitswapLedgerOneWay(t *testing.T) {
692
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
693
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
694
	defer ig.Close()
695 696 697 698
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test ledgers match when one peer sends block to another")

699
	instances := ig.Instances(2)
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
	if err != nil {
		t.Fatal(err)
	}

	ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer)
	rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer)

716
	// compare peer ledger receipts
717 718 719 720 721
	err = assertLedgerMatch(ra, rb)
	if err != nil {
		t.Fatal(err)
	}

722 723 724 725 726 727 728 729 730 731 732 733
	// check that receipts have intended values
	ratest := newReceipt(1, 0, 1)
	err = assertLedgerEqual(ratest, ra)
	if err != nil {
		t.Fatal(err)
	}
	rbtest := newReceipt(0, 1, 1)
	err = assertLedgerEqual(rbtest, rb)
	if err != nil {
		t.Fatal(err)
	}

734 735 736 737 738 739 740 741 742
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}

743
func TestBitswapLedgerTwoWay(t *testing.T) {
744
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
745
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
746
	defer ig.Close()
747 748 749 750
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test ledgers match when two peers send one block to each other")

751
	instances := ig.Instances(2)
752 753 754 755 756 757 758 759 760 761 762 763 764
	blocks := bg.Blocks(2)
	err := instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	err = instances[1].Exchange.HasBlock(blocks[1])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
765
	_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
766 767 768 769 770 771
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
772
	blk, err := instances[0].Exchange.GetBlock(ctx, blocks[1].Cid())
773 774 775 776 777 778 779
	if err != nil {
		t.Fatal(err)
	}

	ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer)
	rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer)

780
	// compare peer ledger receipts
781 782 783 784 785
	err = assertLedgerMatch(ra, rb)
	if err != nil {
		t.Fatal(err)
	}

786 787 788 789 790 791 792 793 794 795 796 797
	// check that receipts have intended values
	rtest := newReceipt(1, 1, 2)
	err = assertLedgerEqual(rtest, ra)
	if err != nil {
		t.Fatal(err)
	}

	err = assertLedgerEqual(rtest, rb)
	if err != nil {
		t.Fatal(err)
	}

798 799 800 801 802 803 804 805
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}