bitswap_test.go 17.6 KB
Newer Older
1
package bitswap_test
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"context"
Jeromy's avatar
Jeromy committed
6
	"fmt"
7
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
8 9 10
	"testing"
	"time"

11
	bitswap "github.com/ipfs/go-bitswap"
Jeromy's avatar
Jeromy committed
12
	decision "github.com/ipfs/go-bitswap/decision"
13
	"github.com/ipfs/go-bitswap/message"
14
	bssession "github.com/ipfs/go-bitswap/session"
15
	testinstance "github.com/ipfs/go-bitswap/testinstance"
Jeromy's avatar
Jeromy committed
16 17 18 19 20 21 22 23 24
	tn "github.com/ipfs/go-bitswap/testnet"
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	detectrace "github.com/ipfs/go-detect-race"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
	p2ptestutil "github.com/libp2p/go-libp2p-netutil"
25 26
	travis "github.com/libp2p/go-libp2p-testing/ci/travis"
	tu "github.com/libp2p/go-libp2p-testing/etc"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
27 28
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
29 30
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
31 32
const kNetworkDelay = 0 * time.Millisecond

33 34 35 36
func getVirtualNetwork() tn.Network {
	return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

37
func TestClose(t *testing.T) {
38
	vnet := getVirtualNetwork()
39 40
	ig := testinstance.NewTestInstanceGenerator(vnet)
	defer ig.Close()
41
	bgen := blocksutil.NewBlockGenerator()
42 43

	block := bgen.Next()
44
	bitswap := ig.Next()
45

46
	bitswap.Exchange.Close()
47
	bitswap.Exchange.GetBlock(context.Background(), block.Cid())
48 49
}

Jeromy's avatar
Jeromy committed
50 51 52 53
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
54 55
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
Jeromy's avatar
Jeromy committed
56 57 58

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
59
	rs.Client(pinfo).Provide(context.Background(), block.Cid(), true) // but not on network
Jeromy's avatar
Jeromy committed
60

61
	solo := ig.Next()
Jeromy's avatar
Jeromy committed
62 63
	defer solo.Exchange.Close()

rht's avatar
rht committed
64 65
	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
	defer cancel()
66
	_, err := solo.Exchange.GetBlock(ctx, block.Cid())
Jeromy's avatar
Jeromy committed
67 68 69 70 71 72

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
73 74
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

75
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
76
	block := blocks.NewBlock([]byte("block"))
77 78
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
79

80
	peers := ig.Instances(2)
81
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
82
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
83

84
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
85 86
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87

88
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
89
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90

rht's avatar
rht committed
91 92
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
93
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Cid())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94 95 96 97
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
98

Jeromy's avatar
Jeromy committed
99
	if !bytes.Equal(block.RawData(), received.RawData()) {
100 101
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
102 103
}

104 105 106
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	block := blocks.NewBlock([]byte("block"))
107
	ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50*time.Millisecond))
108
	defer ig.Close()
109

110
	hasBlock := ig.Next()
111 112
	defer hasBlock.Exchange.Close()

113 114 115
	wantsBlock := ig.Next()
	defer wantsBlock.Exchange.Close()

116 117 118 119
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
		t.Fatal(err)
	}

120
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond)
121 122 123 124 125 126 127 128 129 130 131 132 133 134
	defer cancel()

	ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session)

	received, err := ns.GetBlock(ctx, block.Cid())
	if received != nil {
		t.Fatalf("Expected to find nothing, found %s", received)
	}

	if err != context.DeadlineExceeded {
		t.Fatal("Expected deadline exceeded")
	}
}

135 136 137 138 139 140 141
func TestUnwantedBlockNotAdded(t *testing.T) {

	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	block := blocks.NewBlock([]byte("block"))
	bsMessage := message.New(true)
	bsMessage.AddBlock(block)

142 143
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
144

145
	peers := ig.Instances(2)
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
	hasBlock := peers[0]
	defer hasBlock.Exchange.Close()

	if err := hasBlock.Exchange.HasBlock(block); err != nil {
		t.Fatal(err)
	}

	doesNotWantBlock := peers[1]
	defer doesNotWantBlock.Exchange.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Peer, bsMessage)

161
	blockInStore, err := doesNotWantBlock.Blockstore().Has(block.Cid())
162 163 164 165 166
	if err != nil || blockInStore {
		t.Fatal("Unwanted block added to block store")
	}
}

167
func TestLargeSwarm(t *testing.T) {
168 169 170
	if testing.Short() {
		t.SkipNow()
	}
171
	numInstances := 100
172
	numBlocks := 2
173 174 175
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
176
		numInstances = 50
177 178
	} else if travis.IsRunning() {
		numInstances = 200
179 180 181
	} else {
		t.Parallel()
	}
182 183
	PerformDistributionTest(t, numInstances, numBlocks)
}
184

185 186 187
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
188
	}
189 190 191 192 193

	if !travis.IsRunning() {
		t.Parallel()
	}

194 195 196
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
197 198
}

199 200 201 202 203 204 205 206 207
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

208
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
209
	ctx := context.Background()
210 211 212
	if testing.Short() {
		t.SkipNow()
	}
213
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
214 215
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
216
	bg := blocksutil.NewBlockGenerator()
217

218
	instances := ig.Instances(numInstances)
219 220 221 222
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

223
	var blkeys []cid.Cid
224 225
	first := instances[0]
	for _, b := range blocks {
226
		blkeys = append(blkeys, b.Cid())
227
		first.Exchange.HasBlock(b)
228 229 230 231
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
232
	wg := sync.WaitGroup{}
233 234
	errs := make(chan error)

235
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
236
		wg.Add(1)
237
		go func(inst testinstance.Instance) {
Jeromy's avatar
Jeromy committed
238
			defer wg.Done()
239
			outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
Jeromy's avatar
Jeromy committed
240
			if err != nil {
241
				errs <- err
Jeromy's avatar
Jeromy committed
242
			}
243
			for range outch {
Jeromy's avatar
Jeromy committed
244 245
			}
		}(inst)
246
	}
247 248 249 250 251 252 253 254 255 256 257

	go func() {
		wg.Wait()
		close(errs)
	}()

	for err := range errs {
		if err != nil {
			t.Fatal(err)
		}
	}
258 259 260 261 262

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
263
			if _, err := inst.Blockstore().Get(b.Cid()); err != nil {
264 265 266 267 268 269
				t.Fatal(err)
			}
		}
	}
}

270
// TODO simplify this test. get to the _essence_!
271
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
272 273 274 275
	if testing.Short() {
		t.SkipNow()
	}

276
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
277 278
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
279
	bg := blocksutil.NewBlockGenerator()
280

281
	peers := ig.Instances(2)
282 283
	peerA := peers[0]
	peerB := peers[1]
284

285 286
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
287

288
	waitTime := time.Second * 5
289

290 291
	alpha := bg.Next()
	// peerA requests and waits for block alpha
292
	ctx, cancel := context.WithTimeout(context.Background(), waitTime)
rht's avatar
rht committed
293
	defer cancel()
294
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []cid.Cid{alpha.Cid()})
295
	if err != nil {
296 297
		t.Fatal(err)
	}
298

299
	// peerB announces to the network that he has block alpha
300
	err = peerB.Exchange.HasBlock(alpha)
301
	if err != nil {
302 303
		t.Fatal(err)
	}
304

305 306 307 308
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
309
	}
310

311
	if !blkrecvd.Cid().Equals(alpha.Cid()) {
312
		t.Fatal("Wrong block!")
313
	}
314

315
}
Jeromy's avatar
Jeromy committed
316

jbenet's avatar
jbenet committed
317 318
func TestEmptyKey(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
319 320 321
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
	bs := ig.Instances(1)[0].Exchange
jbenet's avatar
jbenet committed
322

Jeromy's avatar
Jeromy committed
323 324 325
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

326
	_, err := bs.GetBlock(ctx, cid.Cid{})
jbenet's avatar
jbenet committed
327 328 329 330 331
	if err != blockstore.ErrNotFound {
		t.Error("empty str key should return ErrNotFound")
	}
}

332
func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint64) {
Jeromy's avatar
Jeromy committed
333
	if sblks != st.BlocksSent {
Steven Allen's avatar
Steven Allen committed
334
		t.Errorf("mismatch in blocks sent: %d vs %d", sblks, st.BlocksSent)
Jeromy's avatar
Jeromy committed
335 336 337
	}

	if rblks != st.BlocksReceived {
Steven Allen's avatar
Steven Allen committed
338
		t.Errorf("mismatch in blocks recvd: %d vs %d", rblks, st.BlocksReceived)
Jeromy's avatar
Jeromy committed
339 340 341
	}

	if sdata != st.DataSent {
Steven Allen's avatar
Steven Allen committed
342
		t.Errorf("mismatch in data sent: %d vs %d", sdata, st.DataSent)
Jeromy's avatar
Jeromy committed
343 344 345
	}

	if rdata != st.DataReceived {
Steven Allen's avatar
Steven Allen committed
346
		t.Errorf("mismatch in data recvd: %d vs %d", rdata, st.DataReceived)
Jeromy's avatar
Jeromy committed
347 348 349
	}
}

Jeromy's avatar
Jeromy committed
350
func TestBasicBitswap(t *testing.T) {
351
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
352 353
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
Jeromy's avatar
Jeromy committed
354 355
	bg := blocksutil.NewBlockGenerator()

356
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
357

358
	instances := ig.Instances(3)
Jeromy's avatar
Jeromy committed
359
	blocks := bg.Blocks(1)
360 361

	// First peer has block
362
	err := instances[0].Exchange.HasBlock(blocks[0])
Jeromy's avatar
Jeromy committed
363 364 365 366
	if err != nil {
		t.Fatal(err)
	}

367
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
rht's avatar
rht committed
368
	defer cancel()
369 370 371

	// Second peer broadcasts want for block CID
	// (Received by first and third peers)
372
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
Jeromy's avatar
Jeromy committed
373 374 375 376
	if err != nil {
		t.Fatal(err)
	}

377 378
	// When second peer receives block, it should send out a cancel, so third
	// peer should no longer keep second peer's want
379 380 381 382 383 384 385 386 387 388
	if err = tu.WaitFor(ctx, func() error {
		if len(instances[2].Exchange.WantlistForPeer(instances[1].Peer)) != 0 {
			return fmt.Errorf("should have no items in other peers wantlist")
		}
		if len(instances[1].Exchange.GetWantlist()) != 0 {
			return fmt.Errorf("shouldnt have anything in wantlist")
		}
		return nil
	}); err != nil {
		t.Fatal(err)
Jeromy's avatar
Jeromy committed
389 390
	}

Jeromy's avatar
Jeromy committed
391 392 393 394 395 396 397 398 399 400
	st0, err := instances[0].Exchange.Stat()
	if err != nil {
		t.Fatal(err)
	}

	st1, err := instances[1].Exchange.Stat()
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
401 402
	st2, err := instances[2].Exchange.Stat()
	if err != nil {
Jeromy's avatar
Jeromy committed
403 404 405
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
406 407 408 409 410 411 412 413 414
	t.Log("stat node 0")
	assertStat(t, st0, 1, 0, uint64(len(blk.RawData())), 0)
	t.Log("stat node 1")
	assertStat(t, st1, 0, 1, 0, uint64(len(blk.RawData())))
	t.Log("stat node 2")
	assertStat(t, st2, 0, 0, 0, 0)

	if !bytes.Equal(blk.RawData(), blocks[0].RawData()) {
		t.Errorf("blocks aren't equal: expected %v, actual %v", blocks[0].RawData(), blk.RawData())
Jeromy's avatar
Jeromy committed
415 416
	}

Jeromy's avatar
Jeromy committed
417 418 419 420 421 422 423 424
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
Jeromy's avatar
Jeromy committed
425 426 427

func TestDoubleGet(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
428 429
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
Jeromy's avatar
Jeromy committed
430 431 432 433
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a one node trying to get one block from another")

434
	instances := ig.Instances(2)
Jeromy's avatar
Jeromy committed
435 436
	blocks := bg.Blocks(1)

Jeromy's avatar
Jeromy committed
437 438 439
	// NOTE: A race condition can happen here where these GetBlocks requests go
	// through before the peers even get connected. This is okay, bitswap
	// *should* be able to handle this.
Jeromy's avatar
Jeromy committed
440
	ctx1, cancel1 := context.WithCancel(context.Background())
441
	blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []cid.Cid{blocks[0].Cid()})
Jeromy's avatar
Jeromy committed
442 443 444 445 446 447 448
	if err != nil {
		t.Fatal(err)
	}

	ctx2, cancel2 := context.WithCancel(context.Background())
	defer cancel2()

449
	blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []cid.Cid{blocks[0].Cid()})
Jeromy's avatar
Jeromy committed
450 451 452 453
	if err != nil {
		t.Fatal(err)
	}

454
	// ensure both requests make it into the wantlist at the same time
Jeromy's avatar
Jeromy committed
455
	time.Sleep(time.Millisecond * 20)
Jeromy's avatar
Jeromy committed
456 457 458 459 460 461 462 463 464 465 466 467
	cancel1()

	_, ok := <-blkch1
	if ok {
		t.Fatal("expected channel to be closed")
	}

	err = instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

468 469 470 471 472 473 474
	select {
	case blk, ok := <-blkch2:
		if !ok {
			t.Fatal("expected to get the block here")
		}
		t.Log(blk)
	case <-time.After(time.Second * 5):
Jeromy's avatar
Jeromy committed
475 476 477 478 479 480 481 482
		p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Peer)
		if len(p1wl) != 1 {
			t.Logf("wantlist view didnt have 1 item (had %d)", len(p1wl))
		} else if !p1wl[0].Equals(blocks[0].Cid()) {
			t.Logf("had 1 item, it was wrong: %s %s", blocks[0].Cid(), p1wl[0])
		} else {
			t.Log("had correct wantlist, somehow")
		}
483
		t.Fatal("timed out waiting on block")
Jeromy's avatar
Jeromy committed
484 485 486 487 488 489 490 491 492
	}

	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
493 494 495

func TestWantlistCleanup(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
496 497
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
498 499
	bg := blocksutil.NewBlockGenerator()

500
	instances := ig.Instances(1)[0]
501 502 503
	bswap := instances.Exchange
	blocks := bg.Blocks(20)

504
	var keys []cid.Cid
505
	for _, b := range blocks {
506
		keys = append(keys, b.Cid())
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err := bswap.GetBlock(ctx, keys[0])
	if err != context.DeadlineExceeded {
		t.Fatal("shouldnt have fetched any blocks")
	}

	time.Sleep(time.Millisecond * 50)

	if len(bswap.GetWantlist()) > 0 {
		t.Fatal("should not have anyting in wantlist")
	}

	ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err = bswap.GetBlocks(ctx, keys[:10])
	if err != nil {
		t.Fatal(err)
	}

	<-ctx.Done()
	time.Sleep(time.Millisecond * 50)

	if len(bswap.GetWantlist()) > 0 {
		t.Fatal("should not have anyting in wantlist")
	}

	_, err = bswap.GetBlocks(context.Background(), keys[:1])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel = context.WithCancel(context.Background())
	_, err = bswap.GetBlocks(ctx, keys[10:])
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(time.Millisecond * 50)
548 549
	if len(bswap.GetWantlist()) != 5 {
		t.Fatal("should have 5 keys in wantlist")
550 551 552 553 554 555 556 557
	}

	cancel()
	time.Sleep(time.Millisecond * 50)
	if !(len(bswap.GetWantlist()) == 1 && bswap.GetWantlist()[0] == keys[0]) {
		t.Fatal("should only have keys[0] in wantlist")
	}
}
558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574

func assertLedgerMatch(ra, rb *decision.Receipt) error {
	if ra.Sent != rb.Recv {
		return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d sent vs %d recvd", ra.Sent, rb.Recv)
	}

	if ra.Recv != rb.Sent {
		return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d recvd vs %d sent", ra.Recv, rb.Sent)
	}

	if ra.Exchanged != rb.Exchanged {
		return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged)
	}

	return nil
}

575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
func assertLedgerEqual(ra, rb *decision.Receipt) error {
	if ra.Value != rb.Value {
		return fmt.Errorf("mismatch in ledgers (value/debt ratio): %f vs %f ", ra.Value, rb.Value)
	}

	if ra.Sent != rb.Sent {
		return fmt.Errorf("mismatch in ledgers (sent bytes): %d vs %d", ra.Sent, rb.Sent)
	}

	if ra.Recv != rb.Recv {
		return fmt.Errorf("mismatch in ledgers (recvd bytes): %d vs %d", ra.Recv, rb.Recv)
	}

	if ra.Exchanged != rb.Exchanged {
		return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged)
	}

	return nil
}

func newReceipt(sent, recv, exchanged uint64) *decision.Receipt {
	return &decision.Receipt{
		Peer:      "test",
		Value:     float64(sent) / (1 + float64(recv)),
		Sent:      sent,
		Recv:      recv,
		Exchanged: exchanged,
	}
}

func TestBitswapLedgerOneWay(t *testing.T) {
606
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
607 608
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
609 610 611 612
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test ledgers match when one peer sends block to another")

613
	instances := ig.Instances(2)
614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
	if err != nil {
		t.Fatal(err)
	}

	ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer)
	rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer)

630
	// compare peer ledger receipts
631 632 633 634 635
	err = assertLedgerMatch(ra, rb)
	if err != nil {
		t.Fatal(err)
	}

636 637 638 639 640 641 642 643 644 645 646 647
	// check that receipts have intended values
	ratest := newReceipt(1, 0, 1)
	err = assertLedgerEqual(ratest, ra)
	if err != nil {
		t.Fatal(err)
	}
	rbtest := newReceipt(0, 1, 1)
	err = assertLedgerEqual(rbtest, rb)
	if err != nil {
		t.Fatal(err)
	}

648 649 650 651 652 653 654 655 656
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}

657
func TestBitswapLedgerTwoWay(t *testing.T) {
658
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
659 660
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
661 662 663 664
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test ledgers match when two peers send one block to each other")

665
	instances := ig.Instances(2)
666 667 668 669 670 671 672 673 674 675 676 677 678
	blocks := bg.Blocks(2)
	err := instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	err = instances[1].Exchange.HasBlock(blocks[1])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
679
	_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
680 681 682 683 684 685
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
686
	blk, err := instances[0].Exchange.GetBlock(ctx, blocks[1].Cid())
687 688 689 690 691 692 693
	if err != nil {
		t.Fatal(err)
	}

	ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer)
	rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer)

694
	// compare peer ledger receipts
695 696 697 698 699
	err = assertLedgerMatch(ra, rb)
	if err != nil {
		t.Fatal(err)
	}

700 701 702 703 704 705 706 707 708 709 710 711
	// check that receipts have intended values
	rtest := newReceipt(1, 1, 2)
	err = assertLedgerEqual(rtest, ra)
	if err != nil {
		t.Fatal(err)
	}

	err = assertLedgerEqual(rtest, rb)
	if err != nil {
		t.Fatal(err)
	}

712 713 714 715 716 717 718 719
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}