bitswap_test.go 22.2 KB
Newer Older
1
package bitswap_test
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"context"
Jeromy's avatar
Jeromy committed
6
	"fmt"
7
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
8 9 10
	"testing"
	"time"

11
	bitswap "github.com/ipfs/go-bitswap"
12
	deciface "github.com/ipfs/go-bitswap/decision"
13 14
	decision "github.com/ipfs/go-bitswap/internal/decision"
	bssession "github.com/ipfs/go-bitswap/internal/session"
15
	"github.com/ipfs/go-bitswap/message"
16 17
	testinstance "github.com/ipfs/go-bitswap/testinstance"
	tn "github.com/ipfs/go-bitswap/testnet"
Jeromy's avatar
Jeromy committed
18 19 20 21 22 23 24
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	detectrace "github.com/ipfs/go-detect-race"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
25
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
26
	p2ptestutil "github.com/libp2p/go-libp2p-netutil"
27 28
	travis "github.com/libp2p/go-libp2p-testing/ci/travis"
	tu "github.com/libp2p/go-libp2p-testing/etc"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
29 30
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
31 32
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
33 34
const kNetworkDelay = 0 * time.Millisecond

35 36 37 38
func getVirtualNetwork() tn.Network {
	return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

39
func TestClose(t *testing.T) {
40
	vnet := getVirtualNetwork()
41
	ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
42
	defer ig.Close()
43
	bgen := blocksutil.NewBlockGenerator()
44 45

	block := bgen.Next()
46
	bitswap := ig.Next()
47

48
	bitswap.Exchange.Close()
Steven Allen's avatar
Steven Allen committed
49 50 51 52
	_, err := bitswap.Exchange.GetBlock(context.Background(), block.Cid())
	if err == nil {
		t.Fatal("expected GetBlock to fail")
	}
53 54
}

Jeromy's avatar
Jeromy committed
55 56 57 58
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
59
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
60
	defer ig.Close()
Jeromy's avatar
Jeromy committed
61 62 63

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
Steven Allen's avatar
Steven Allen committed
64 65 66 67
	err := rs.Client(pinfo).Provide(context.Background(), block.Cid(), true) // but not on network
	if err != nil {
		t.Fatal(err)
	}
Jeromy's avatar
Jeromy committed
68

69
	solo := ig.Next()
Jeromy's avatar
Jeromy committed
70 71
	defer solo.Exchange.Close()

rht's avatar
rht committed
72 73
	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
	defer cancel()
Steven Allen's avatar
Steven Allen committed
74
	_, err = solo.Exchange.GetBlock(ctx, block.Cid())
Jeromy's avatar
Jeromy committed
75 76 77 78 79 80

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
81 82
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

83
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
84
	block := blocks.NewBlock([]byte("block"))
85
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
86
	defer ig.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87

88
	peers := ig.Instances(2)
89
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
90
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
91

92
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
93 94
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
95

96
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
97
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
98

rht's avatar
rht committed
99 100
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
101
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Cid())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
102 103 104 105
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
106

Jeromy's avatar
Jeromy committed
107
	if !bytes.Equal(block.RawData(), received.RawData()) {
108 109
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
110 111
}

112 113 114
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	block := blocks.NewBlock([]byte("block"))
115 116
	bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)}
	ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
117
	defer ig.Close()
118

119
	hasBlock := ig.Next()
120 121
	defer hasBlock.Exchange.Close()

122 123 124
	wantsBlock := ig.Next()
	defer wantsBlock.Exchange.Close()

125 126 127 128
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
		t.Fatal(err)
	}

129
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond)
130 131 132 133 134 135 136 137 138 139 140 141 142 143
	defer cancel()

	ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session)

	received, err := ns.GetBlock(ctx, block.Cid())
	if received != nil {
		t.Fatalf("Expected to find nothing, found %s", received)
	}

	if err != context.DeadlineExceeded {
		t.Fatal("Expected deadline exceeded")
	}
}

144 145
// Tests that a received block is not stored in the blockstore if the block was
// not requested by the client
146 147 148 149 150 151 152
func TestUnwantedBlockNotAdded(t *testing.T) {

	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	block := blocks.NewBlock([]byte("block"))
	bsMessage := message.New(true)
	bsMessage.AddBlock(block)

153
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
154
	defer ig.Close()
155

156
	peers := ig.Instances(2)
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
	hasBlock := peers[0]
	defer hasBlock.Exchange.Close()

	if err := hasBlock.Exchange.HasBlock(block); err != nil {
		t.Fatal(err)
	}

	doesNotWantBlock := peers[1]
	defer doesNotWantBlock.Exchange.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Peer, bsMessage)

172
	blockInStore, err := doesNotWantBlock.Blockstore().Has(block.Cid())
173 174 175 176 177
	if err != nil || blockInStore {
		t.Fatal("Unwanted block added to block store")
	}
}

178 179 180 181 182 183 184 185 186 187 188
// Tests that a received block is returned to the client and stored in the
// blockstore in the following scenario:
// - the want for the block has been requested by the client
// - the want for the block has not yet been sent out to a peer
//   (because the live request queue is full)
func TestPendingBlockAdded(t *testing.T) {
	ctx := context.Background()
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	bg := blocksutil.NewBlockGenerator()
	sessionBroadcastWantCapacity := 4

189
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
	defer ig.Close()

	instance := ig.Instances(1)[0]
	defer instance.Exchange.Close()

	oneSecCtx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	// Request enough blocks to exceed the session's broadcast want list
	// capacity (by one block). The session will put the remaining block
	// into the "tofetch" queue
	blks := bg.Blocks(sessionBroadcastWantCapacity + 1)
	ks := make([]cid.Cid, 0, len(blks))
	for _, b := range blks {
		ks = append(ks, b.Cid())
	}
	outch, err := instance.Exchange.GetBlocks(ctx, ks)
	if err != nil {
		t.Fatal(err)
	}

	// Wait a little while to make sure the session has time to process the wants
	time.Sleep(time.Millisecond * 20)

	// Simulate receiving a message which contains the block in the "tofetch" queue
	lastBlock := blks[len(blks)-1]
	bsMessage := message.New(true)
	bsMessage.AddBlock(lastBlock)
	unknownPeer := peer.ID("QmUHfvCQrzyR6vFXmeyCptfCWedfcmfa12V6UuziDtrw23")
	instance.Exchange.ReceiveMessage(oneSecCtx, unknownPeer, bsMessage)

	// Make sure Bitswap adds the block to the output channel
	blkrecvd, ok := <-outch
	if !ok {
		t.Fatal("timed out waiting for block")
	}
	if !blkrecvd.Cid().Equals(lastBlock.Cid()) {
		t.Fatal("received wrong block")
	}

	// Make sure Bitswap adds the block to the blockstore
	blockInStore, err := instance.Blockstore().Has(lastBlock.Cid())
	if err != nil {
		t.Fatal(err)
	}
	if !blockInStore {
		t.Fatal("Block was not added to block store")
	}
}

240
func TestLargeSwarm(t *testing.T) {
241 242 243
	if testing.Short() {
		t.SkipNow()
	}
244
	numInstances := 100
245
	numBlocks := 2
246 247 248
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
249
		numInstances = 20
250 251
	} else if travis.IsRunning() {
		numInstances = 200
252 253 254
	} else {
		t.Parallel()
	}
255 256
	PerformDistributionTest(t, numInstances, numBlocks)
}
257

258 259 260
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
261
	}
262 263 264 265 266

	if !travis.IsRunning() {
		t.Parallel()
	}

267 268 269
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
270 271
}

272 273 274 275 276 277 278 279 280
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

281
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
282
	ctx := context.Background()
283 284 285
	if testing.Short() {
		t.SkipNow()
	}
286
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
287
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
288
	defer ig.Close()
289
	bg := blocksutil.NewBlockGenerator()
290

291
	instances := ig.Instances(numInstances)
292 293 294 295
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

296
	var blkeys []cid.Cid
297 298
	first := instances[0]
	for _, b := range blocks {
299
		blkeys = append(blkeys, b.Cid())
Steven Allen's avatar
Steven Allen committed
300 301 302 303
		err := first.Exchange.HasBlock(b)
		if err != nil {
			t.Fatal(err)
		}
304 305 306 307
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
308
	wg := sync.WaitGroup{}
309 310
	errs := make(chan error)

311
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
312
		wg.Add(1)
313
		go func(inst testinstance.Instance) {
Jeromy's avatar
Jeromy committed
314
			defer wg.Done()
315
			outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
Jeromy's avatar
Jeromy committed
316
			if err != nil {
317
				errs <- err
Jeromy's avatar
Jeromy committed
318
			}
319
			for range outch {
Jeromy's avatar
Jeromy committed
320 321
			}
		}(inst)
322
	}
323 324 325 326 327 328 329 330 331 332 333

	go func() {
		wg.Wait()
		close(errs)
	}()

	for err := range errs {
		if err != nil {
			t.Fatal(err)
		}
	}
334 335 336 337 338

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
339
			if _, err := inst.Blockstore().Get(b.Cid()); err != nil {
340 341 342 343 344 345
				t.Fatal(err)
			}
		}
	}
}

346
// TODO simplify this test. get to the _essence_!
347
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
348 349 350 351
	if testing.Short() {
		t.SkipNow()
	}

352
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
353
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
354
	defer ig.Close()
355
	bg := blocksutil.NewBlockGenerator()
356

357
	peers := ig.Instances(2)
358 359
	peerA := peers[0]
	peerB := peers[1]
360

361 362
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
363

364
	waitTime := time.Second * 5
365

366 367
	alpha := bg.Next()
	// peerA requests and waits for block alpha
368
	ctx, cancel := context.WithTimeout(context.Background(), waitTime)
rht's avatar
rht committed
369
	defer cancel()
370
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []cid.Cid{alpha.Cid()})
371
	if err != nil {
372 373
		t.Fatal(err)
	}
374

375
	// peerB announces to the network that he has block alpha
376
	err = peerB.Exchange.HasBlock(alpha)
377
	if err != nil {
378 379
		t.Fatal(err)
	}
380

381 382 383 384
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
385
	}
386

387
	if !blkrecvd.Cid().Equals(alpha.Cid()) {
388
		t.Fatal("Wrong block!")
389
	}
390

391
}
Jeromy's avatar
Jeromy committed
392

jbenet's avatar
jbenet committed
393 394
func TestEmptyKey(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
395
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
396 397
	defer ig.Close()
	bs := ig.Instances(1)[0].Exchange
jbenet's avatar
jbenet committed
398

Jeromy's avatar
Jeromy committed
399 400 401
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

402
	_, err := bs.GetBlock(ctx, cid.Cid{})
jbenet's avatar
jbenet committed
403 404 405 406 407
	if err != blockstore.ErrNotFound {
		t.Error("empty str key should return ErrNotFound")
	}
}

408
func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint64) {
Jeromy's avatar
Jeromy committed
409
	if sblks != st.BlocksSent {
Steven Allen's avatar
Steven Allen committed
410
		t.Errorf("mismatch in blocks sent: %d vs %d", sblks, st.BlocksSent)
Jeromy's avatar
Jeromy committed
411 412 413
	}

	if rblks != st.BlocksReceived {
Steven Allen's avatar
Steven Allen committed
414
		t.Errorf("mismatch in blocks recvd: %d vs %d", rblks, st.BlocksReceived)
Jeromy's avatar
Jeromy committed
415 416 417
	}

	if sdata != st.DataSent {
Steven Allen's avatar
Steven Allen committed
418
		t.Errorf("mismatch in data sent: %d vs %d", sdata, st.DataSent)
Jeromy's avatar
Jeromy committed
419 420 421
	}

	if rdata != st.DataReceived {
Steven Allen's avatar
Steven Allen committed
422
		t.Errorf("mismatch in data recvd: %d vs %d", rdata, st.DataReceived)
Jeromy's avatar
Jeromy committed
423 424 425
	}
}

Jeromy's avatar
Jeromy committed
426
func TestBasicBitswap(t *testing.T) {
427
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
428
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
429
	defer ig.Close()
Jeromy's avatar
Jeromy committed
430 431
	bg := blocksutil.NewBlockGenerator()

432
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
433

434
	instances := ig.Instances(3)
Jeromy's avatar
Jeromy committed
435
	blocks := bg.Blocks(1)
436 437

	// First peer has block
438
	err := instances[0].Exchange.HasBlock(blocks[0])
Jeromy's avatar
Jeromy committed
439 440 441 442
	if err != nil {
		t.Fatal(err)
	}

443
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
rht's avatar
rht committed
444
	defer cancel()
445 446 447

	// Second peer broadcasts want for block CID
	// (Received by first and third peers)
448
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
Jeromy's avatar
Jeromy committed
449 450 451 452
	if err != nil {
		t.Fatal(err)
	}

453 454
	// When second peer receives block, it should send out a cancel, so third
	// peer should no longer keep second peer's want
455 456 457 458 459 460 461 462 463 464
	if err = tu.WaitFor(ctx, func() error {
		if len(instances[2].Exchange.WantlistForPeer(instances[1].Peer)) != 0 {
			return fmt.Errorf("should have no items in other peers wantlist")
		}
		if len(instances[1].Exchange.GetWantlist()) != 0 {
			return fmt.Errorf("shouldnt have anything in wantlist")
		}
		return nil
	}); err != nil {
		t.Fatal(err)
Jeromy's avatar
Jeromy committed
465 466
	}

Jeromy's avatar
Jeromy committed
467 468 469 470 471 472 473 474 475 476
	st0, err := instances[0].Exchange.Stat()
	if err != nil {
		t.Fatal(err)
	}

	st1, err := instances[1].Exchange.Stat()
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
477 478
	st2, err := instances[2].Exchange.Stat()
	if err != nil {
Jeromy's avatar
Jeromy committed
479 480 481
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
482 483 484 485 486 487 488 489 490
	t.Log("stat node 0")
	assertStat(t, st0, 1, 0, uint64(len(blk.RawData())), 0)
	t.Log("stat node 1")
	assertStat(t, st1, 0, 1, 0, uint64(len(blk.RawData())))
	t.Log("stat node 2")
	assertStat(t, st2, 0, 0, 0, 0)

	if !bytes.Equal(blk.RawData(), blocks[0].RawData()) {
		t.Errorf("blocks aren't equal: expected %v, actual %v", blocks[0].RawData(), blk.RawData())
Jeromy's avatar
Jeromy committed
491 492
	}

Jeromy's avatar
Jeromy committed
493 494 495 496 497 498 499 500
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
Jeromy's avatar
Jeromy committed
501 502 503

func TestDoubleGet(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
504
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
505
	defer ig.Close()
Jeromy's avatar
Jeromy committed
506 507 508 509
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a one node trying to get one block from another")

510
	instances := ig.Instances(2)
Jeromy's avatar
Jeromy committed
511 512
	blocks := bg.Blocks(1)

Jeromy's avatar
Jeromy committed
513 514 515
	// NOTE: A race condition can happen here where these GetBlocks requests go
	// through before the peers even get connected. This is okay, bitswap
	// *should* be able to handle this.
Jeromy's avatar
Jeromy committed
516
	ctx1, cancel1 := context.WithCancel(context.Background())
517
	blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []cid.Cid{blocks[0].Cid()})
Jeromy's avatar
Jeromy committed
518 519 520 521 522 523 524
	if err != nil {
		t.Fatal(err)
	}

	ctx2, cancel2 := context.WithCancel(context.Background())
	defer cancel2()

525
	blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []cid.Cid{blocks[0].Cid()})
Jeromy's avatar
Jeromy committed
526 527 528 529
	if err != nil {
		t.Fatal(err)
	}

530
	// ensure both requests make it into the wantlist at the same time
Jeromy's avatar
Jeromy committed
531
	time.Sleep(time.Millisecond * 20)
Jeromy's avatar
Jeromy committed
532 533 534 535 536 537 538 539 540 541 542 543
	cancel1()

	_, ok := <-blkch1
	if ok {
		t.Fatal("expected channel to be closed")
	}

	err = instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

544 545 546 547 548 549 550
	select {
	case blk, ok := <-blkch2:
		if !ok {
			t.Fatal("expected to get the block here")
		}
		t.Log(blk)
	case <-time.After(time.Second * 5):
Jeromy's avatar
Jeromy committed
551 552 553 554 555 556 557 558
		p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Peer)
		if len(p1wl) != 1 {
			t.Logf("wantlist view didnt have 1 item (had %d)", len(p1wl))
		} else if !p1wl[0].Equals(blocks[0].Cid()) {
			t.Logf("had 1 item, it was wrong: %s %s", blocks[0].Cid(), p1wl[0])
		} else {
			t.Log("had correct wantlist, somehow")
		}
559
		t.Fatal("timed out waiting on block")
Jeromy's avatar
Jeromy committed
560 561 562 563 564 565 566 567 568
	}

	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
569 570 571

func TestWantlistCleanup(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
572
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
573
	defer ig.Close()
574 575
	bg := blocksutil.NewBlockGenerator()

dirkmc's avatar
dirkmc committed
576 577 578
	instances := ig.Instances(2)
	instance := instances[0]
	bswap := instance.Exchange
579 580
	blocks := bg.Blocks(20)

581
	var keys []cid.Cid
582
	for _, b := range blocks {
583
		keys = append(keys, b.Cid())
584 585
	}

dirkmc's avatar
dirkmc committed
586
	// Once context times out, key should be removed from wantlist
587 588 589 590 591 592 593 594 595
	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err := bswap.GetBlock(ctx, keys[0])
	if err != context.DeadlineExceeded {
		t.Fatal("shouldnt have fetched any blocks")
	}

	time.Sleep(time.Millisecond * 50)

dirkmc's avatar
dirkmc committed
596
	if len(bswap.GetWantHaves()) > 0 {
597 598 599
		t.Fatal("should not have anyting in wantlist")
	}

dirkmc's avatar
dirkmc committed
600
	// Once context times out, keys should be removed from wantlist
601 602 603 604 605 606 607 608 609 610
	ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err = bswap.GetBlocks(ctx, keys[:10])
	if err != nil {
		t.Fatal(err)
	}

	<-ctx.Done()
	time.Sleep(time.Millisecond * 50)

dirkmc's avatar
dirkmc committed
611
	if len(bswap.GetWantHaves()) > 0 {
612 613 614
		t.Fatal("should not have anyting in wantlist")
	}

dirkmc's avatar
dirkmc committed
615
	// Send want for single block, with no timeout
616 617 618 619 620
	_, err = bswap.GetBlocks(context.Background(), keys[:1])
	if err != nil {
		t.Fatal(err)
	}

dirkmc's avatar
dirkmc committed
621
	// Send want for 10 blocks
622 623 624 625 626 627
	ctx, cancel = context.WithCancel(context.Background())
	_, err = bswap.GetBlocks(ctx, keys[10:])
	if err != nil {
		t.Fatal(err)
	}

dirkmc's avatar
dirkmc committed
628 629
	// Even after 50 milli-seconds we haven't explicitly cancelled anything
	// and no timeouts have expired, so we should have 11 want-haves
630
	time.Sleep(time.Millisecond * 50)
dirkmc's avatar
dirkmc committed
631 632
	if len(bswap.GetWantHaves()) != 11 {
		t.Fatal("should have 11 keys in wantlist")
633 634
	}

dirkmc's avatar
dirkmc committed
635 636
	// Cancel the timeout for the request for 10 blocks. This should remove
	// the want-haves
637
	cancel()
dirkmc's avatar
dirkmc committed
638 639

	// Once the cancel is processed, we are left with the request for 1 block
640
	time.Sleep(time.Millisecond * 50)
dirkmc's avatar
dirkmc committed
641
	if !(len(bswap.GetWantHaves()) == 1 && bswap.GetWantHaves()[0] == keys[0]) {
642 643 644
		t.Fatal("should only have keys[0] in wantlist")
	}
}
645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661

func assertLedgerMatch(ra, rb *decision.Receipt) error {
	if ra.Sent != rb.Recv {
		return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d sent vs %d recvd", ra.Sent, rb.Recv)
	}

	if ra.Recv != rb.Sent {
		return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d recvd vs %d sent", ra.Recv, rb.Sent)
	}

	if ra.Exchanged != rb.Exchanged {
		return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged)
	}

	return nil
}

662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
func assertLedgerEqual(ra, rb *decision.Receipt) error {
	if ra.Value != rb.Value {
		return fmt.Errorf("mismatch in ledgers (value/debt ratio): %f vs %f ", ra.Value, rb.Value)
	}

	if ra.Sent != rb.Sent {
		return fmt.Errorf("mismatch in ledgers (sent bytes): %d vs %d", ra.Sent, rb.Sent)
	}

	if ra.Recv != rb.Recv {
		return fmt.Errorf("mismatch in ledgers (recvd bytes): %d vs %d", ra.Recv, rb.Recv)
	}

	if ra.Exchanged != rb.Exchanged {
		return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged)
	}

	return nil
}

func newReceipt(sent, recv, exchanged uint64) *decision.Receipt {
	return &decision.Receipt{
		Peer:      "test",
		Value:     float64(sent) / (1 + float64(recv)),
		Sent:      sent,
		Recv:      recv,
		Exchanged: exchanged,
	}
}

func TestBitswapLedgerOneWay(t *testing.T) {
693
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
694
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
695
	defer ig.Close()
696 697 698 699
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test ledgers match when one peer sends block to another")

700
	instances := ig.Instances(2)
701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
	if err != nil {
		t.Fatal(err)
	}

	ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer)
	rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer)

717
	// compare peer ledger receipts
718 719 720 721 722
	err = assertLedgerMatch(ra, rb)
	if err != nil {
		t.Fatal(err)
	}

723 724 725 726 727 728 729 730 731 732 733 734
	// check that receipts have intended values
	ratest := newReceipt(1, 0, 1)
	err = assertLedgerEqual(ratest, ra)
	if err != nil {
		t.Fatal(err)
	}
	rbtest := newReceipt(0, 1, 1)
	err = assertLedgerEqual(rbtest, rb)
	if err != nil {
		t.Fatal(err)
	}

735 736 737 738 739 740 741 742 743
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}

744
func TestBitswapLedgerTwoWay(t *testing.T) {
745
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
746
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
747
	defer ig.Close()
748 749 750 751
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test ledgers match when two peers send one block to each other")

752
	instances := ig.Instances(2)
753 754 755 756 757 758 759 760 761 762 763 764 765
	blocks := bg.Blocks(2)
	err := instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	err = instances[1].Exchange.HasBlock(blocks[1])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
766
	_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
767 768 769 770 771 772
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
773
	blk, err := instances[0].Exchange.GetBlock(ctx, blocks[1].Cid())
774 775 776 777 778 779 780
	if err != nil {
		t.Fatal(err)
	}

	ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer)
	rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer)

781
	// compare peer ledger receipts
782 783 784 785 786
	err = assertLedgerMatch(ra, rb)
	if err != nil {
		t.Fatal(err)
	}

787 788 789 790 791 792 793 794 795 796 797 798
	// check that receipts have intended values
	rtest := newReceipt(1, 1, 2)
	err = assertLedgerEqual(rtest, ra)
	if err != nil {
		t.Fatal(err)
	}

	err = assertLedgerEqual(rtest, rb)
	if err != nil {
		t.Fatal(err)
	}

799 800 801 802 803 804 805 806
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862

type testingScoreLedger struct {
	scorePeer deciface.ScorePeerFunc
	started   chan struct{}
	closed    chan struct{}
}

func newTestingScoreLedger() *testingScoreLedger {
	return &testingScoreLedger{
		nil,
		make(chan struct{}),
		make(chan struct{}),
	}
}

func (tsl *testingScoreLedger) GetReceipt(p peer.ID) *deciface.Receipt {
	return nil
}
func (tsl *testingScoreLedger) AddToSentBytes(p peer.ID, n int)     {}
func (tsl *testingScoreLedger) AddToReceivedBytes(p peer.ID, n int) {}
func (tsl *testingScoreLedger) PeerConnected(p peer.ID)             {}
func (tsl *testingScoreLedger) PeerDisconnected(p peer.ID)          {}
func (tsl *testingScoreLedger) Start(scorePeer deciface.ScorePeerFunc) {
	tsl.scorePeer = scorePeer
	close(tsl.started)
}
func (tsl *testingScoreLedger) Stop() {
	close(tsl.closed)
}

// Tests start and stop of a custom decision logic
func TestWithScoreLedger(t *testing.T) {
	tsl := newTestingScoreLedger()
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	bsOpts := []bitswap.Option{bitswap.WithScoreLedger(tsl)}
	ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
	defer ig.Close()
	i := ig.Next()
	defer i.Exchange.Close()

	select {
	case <-tsl.started:
		if tsl.scorePeer == nil {
			t.Fatal("Expected the score function to be initialized")
		}
	case <-time.After(time.Second * 5):
		t.Fatal("Expected the score ledger to be started within 5s")
	}

	i.Exchange.Close()
	select {
	case <-tsl.closed:
	case <-time.After(time.Second * 5):
		t.Fatal("Expected the score ledger to be closed within 5s")
	}
}