bitswap_test.go 20.4 KB
Newer Older
1
package bitswap_test
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"context"
Jeromy's avatar
Jeromy committed
6
	"fmt"
7
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
8 9 10
	"testing"
	"time"

11
	bitswap "github.com/ipfs/go-bitswap"
Jeromy's avatar
Jeromy committed
12
	decision "github.com/ipfs/go-bitswap/decision"
13
	"github.com/ipfs/go-bitswap/message"
14
	bssession "github.com/ipfs/go-bitswap/session"
15
	testinstance "github.com/ipfs/go-bitswap/testinstance"
Jeromy's avatar
Jeromy committed
16 17 18 19 20 21 22 23
	tn "github.com/ipfs/go-bitswap/testnet"
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	detectrace "github.com/ipfs/go-detect-race"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
24
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
25
	p2ptestutil "github.com/libp2p/go-libp2p-netutil"
26 27
	travis "github.com/libp2p/go-libp2p-testing/ci/travis"
	tu "github.com/libp2p/go-libp2p-testing/etc"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
28 29
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
30 31
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
32 33
const kNetworkDelay = 0 * time.Millisecond

34 35 36 37
func getVirtualNetwork() tn.Network {
	return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

38
func TestClose(t *testing.T) {
39
	vnet := getVirtualNetwork()
40 41
	ig := testinstance.NewTestInstanceGenerator(vnet)
	defer ig.Close()
42
	bgen := blocksutil.NewBlockGenerator()
43 44

	block := bgen.Next()
45
	bitswap := ig.Next()
46

47
	bitswap.Exchange.Close()
Steven Allen's avatar
Steven Allen committed
48 49 50 51
	_, err := bitswap.Exchange.GetBlock(context.Background(), block.Cid())
	if err == nil {
		t.Fatal("expected GetBlock to fail")
	}
52 53
}

Jeromy's avatar
Jeromy committed
54 55 56 57
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
58 59
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
Jeromy's avatar
Jeromy committed
60 61 62

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
Steven Allen's avatar
Steven Allen committed
63 64 65 66
	err := rs.Client(pinfo).Provide(context.Background(), block.Cid(), true) // but not on network
	if err != nil {
		t.Fatal(err)
	}
Jeromy's avatar
Jeromy committed
67

68
	solo := ig.Next()
Jeromy's avatar
Jeromy committed
69 70
	defer solo.Exchange.Close()

rht's avatar
rht committed
71 72
	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
	defer cancel()
Steven Allen's avatar
Steven Allen committed
73
	_, err = solo.Exchange.GetBlock(ctx, block.Cid())
Jeromy's avatar
Jeromy committed
74 75 76 77 78 79

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
80 81
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

82
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
83
	block := blocks.NewBlock([]byte("block"))
84 85
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
86

87
	peers := ig.Instances(2)
88
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
89
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90

91
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
92 93
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94

95
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
96
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
97

rht's avatar
rht committed
98 99
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
100
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Cid())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
101 102 103 104
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
105

Jeromy's avatar
Jeromy committed
106
	if !bytes.Equal(block.RawData(), received.RawData()) {
107 108
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
109 110
}

111 112 113
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	block := blocks.NewBlock([]byte("block"))
114
	ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50*time.Millisecond))
115
	defer ig.Close()
116

117
	hasBlock := ig.Next()
118 119
	defer hasBlock.Exchange.Close()

120 121 122
	wantsBlock := ig.Next()
	defer wantsBlock.Exchange.Close()

123 124 125 126
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
		t.Fatal(err)
	}

127
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond)
128 129 130 131 132 133 134 135 136 137 138 139 140 141
	defer cancel()

	ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session)

	received, err := ns.GetBlock(ctx, block.Cid())
	if received != nil {
		t.Fatalf("Expected to find nothing, found %s", received)
	}

	if err != context.DeadlineExceeded {
		t.Fatal("Expected deadline exceeded")
	}
}

142 143
// Tests that a received block is not stored in the blockstore if the block was
// not requested by the client
144 145 146 147 148 149 150
func TestUnwantedBlockNotAdded(t *testing.T) {

	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	block := blocks.NewBlock([]byte("block"))
	bsMessage := message.New(true)
	bsMessage.AddBlock(block)

151 152
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
153

154
	peers := ig.Instances(2)
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
	hasBlock := peers[0]
	defer hasBlock.Exchange.Close()

	if err := hasBlock.Exchange.HasBlock(block); err != nil {
		t.Fatal(err)
	}

	doesNotWantBlock := peers[1]
	defer doesNotWantBlock.Exchange.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Peer, bsMessage)

170
	blockInStore, err := doesNotWantBlock.Blockstore().Has(block.Cid())
171 172 173 174 175
	if err != nil || blockInStore {
		t.Fatal("Unwanted block added to block store")
	}
}

176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
// Tests that a received block is returned to the client and stored in the
// blockstore in the following scenario:
// - the want for the block has been requested by the client
// - the want for the block has not yet been sent out to a peer
//   (because the live request queue is full)
func TestPendingBlockAdded(t *testing.T) {
	ctx := context.Background()
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	bg := blocksutil.NewBlockGenerator()
	sessionBroadcastWantCapacity := 4

	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()

	instance := ig.Instances(1)[0]
	defer instance.Exchange.Close()

	oneSecCtx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	// Request enough blocks to exceed the session's broadcast want list
	// capacity (by one block). The session will put the remaining block
	// into the "tofetch" queue
	blks := bg.Blocks(sessionBroadcastWantCapacity + 1)
	ks := make([]cid.Cid, 0, len(blks))
	for _, b := range blks {
		ks = append(ks, b.Cid())
	}
	outch, err := instance.Exchange.GetBlocks(ctx, ks)
	if err != nil {
		t.Fatal(err)
	}

	// Wait a little while to make sure the session has time to process the wants
	time.Sleep(time.Millisecond * 20)

	// Simulate receiving a message which contains the block in the "tofetch" queue
	lastBlock := blks[len(blks)-1]
	bsMessage := message.New(true)
	bsMessage.AddBlock(lastBlock)
	unknownPeer := peer.ID("QmUHfvCQrzyR6vFXmeyCptfCWedfcmfa12V6UuziDtrw23")
	instance.Exchange.ReceiveMessage(oneSecCtx, unknownPeer, bsMessage)

	// Make sure Bitswap adds the block to the output channel
	blkrecvd, ok := <-outch
	if !ok {
		t.Fatal("timed out waiting for block")
	}
	if !blkrecvd.Cid().Equals(lastBlock.Cid()) {
		t.Fatal("received wrong block")
	}

	// Make sure Bitswap adds the block to the blockstore
	blockInStore, err := instance.Blockstore().Has(lastBlock.Cid())
	if err != nil {
		t.Fatal(err)
	}
	if !blockInStore {
		t.Fatal("Block was not added to block store")
	}
}

238
func TestLargeSwarm(t *testing.T) {
239 240 241
	if testing.Short() {
		t.SkipNow()
	}
242
	numInstances := 100
243
	numBlocks := 2
244 245 246
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
247
		numInstances = 50
248 249
	} else if travis.IsRunning() {
		numInstances = 200
250 251 252
	} else {
		t.Parallel()
	}
253 254
	PerformDistributionTest(t, numInstances, numBlocks)
}
255

256 257 258
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
259
	}
260 261 262 263 264

	if !travis.IsRunning() {
		t.Parallel()
	}

265 266 267
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
268 269
}

270 271 272 273 274 275 276 277 278
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

279
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
280
	ctx := context.Background()
281 282 283
	if testing.Short() {
		t.SkipNow()
	}
284
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
285 286
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
287
	bg := blocksutil.NewBlockGenerator()
288

289
	instances := ig.Instances(numInstances)
290 291 292 293
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

294
	var blkeys []cid.Cid
295 296
	first := instances[0]
	for _, b := range blocks {
297
		blkeys = append(blkeys, b.Cid())
Steven Allen's avatar
Steven Allen committed
298 299 300 301
		err := first.Exchange.HasBlock(b)
		if err != nil {
			t.Fatal(err)
		}
302 303 304 305
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
306
	wg := sync.WaitGroup{}
307 308
	errs := make(chan error)

309
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
310
		wg.Add(1)
311
		go func(inst testinstance.Instance) {
Jeromy's avatar
Jeromy committed
312
			defer wg.Done()
313
			outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
Jeromy's avatar
Jeromy committed
314
			if err != nil {
315
				errs <- err
Jeromy's avatar
Jeromy committed
316
			}
317
			for range outch {
Jeromy's avatar
Jeromy committed
318 319
			}
		}(inst)
320
	}
321 322 323 324 325 326 327 328 329 330 331

	go func() {
		wg.Wait()
		close(errs)
	}()

	for err := range errs {
		if err != nil {
			t.Fatal(err)
		}
	}
332 333 334 335 336

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
337
			if _, err := inst.Blockstore().Get(b.Cid()); err != nil {
338 339 340 341 342 343
				t.Fatal(err)
			}
		}
	}
}

344
// TODO simplify this test. get to the _essence_!
345
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
346 347 348 349
	if testing.Short() {
		t.SkipNow()
	}

350
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
351 352
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
353
	bg := blocksutil.NewBlockGenerator()
354

355
	peers := ig.Instances(2)
356 357
	peerA := peers[0]
	peerB := peers[1]
358

359 360
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
361

362
	waitTime := time.Second * 5
363

364 365
	alpha := bg.Next()
	// peerA requests and waits for block alpha
366
	ctx, cancel := context.WithTimeout(context.Background(), waitTime)
rht's avatar
rht committed
367
	defer cancel()
368
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []cid.Cid{alpha.Cid()})
369
	if err != nil {
370 371
		t.Fatal(err)
	}
372

373
	// peerB announces to the network that he has block alpha
374
	err = peerB.Exchange.HasBlock(alpha)
375
	if err != nil {
376 377
		t.Fatal(err)
	}
378

379 380 381 382
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
383
	}
384

385
	if !blkrecvd.Cid().Equals(alpha.Cid()) {
386
		t.Fatal("Wrong block!")
387
	}
388

389
}
Jeromy's avatar
Jeromy committed
390

jbenet's avatar
jbenet committed
391 392
func TestEmptyKey(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
393 394 395
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
	bs := ig.Instances(1)[0].Exchange
jbenet's avatar
jbenet committed
396

Jeromy's avatar
Jeromy committed
397 398 399
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

400
	_, err := bs.GetBlock(ctx, cid.Cid{})
jbenet's avatar
jbenet committed
401 402 403 404 405
	if err != blockstore.ErrNotFound {
		t.Error("empty str key should return ErrNotFound")
	}
}

406
func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint64) {
Jeromy's avatar
Jeromy committed
407
	if sblks != st.BlocksSent {
Steven Allen's avatar
Steven Allen committed
408
		t.Errorf("mismatch in blocks sent: %d vs %d", sblks, st.BlocksSent)
Jeromy's avatar
Jeromy committed
409 410 411
	}

	if rblks != st.BlocksReceived {
Steven Allen's avatar
Steven Allen committed
412
		t.Errorf("mismatch in blocks recvd: %d vs %d", rblks, st.BlocksReceived)
Jeromy's avatar
Jeromy committed
413 414 415
	}

	if sdata != st.DataSent {
Steven Allen's avatar
Steven Allen committed
416
		t.Errorf("mismatch in data sent: %d vs %d", sdata, st.DataSent)
Jeromy's avatar
Jeromy committed
417 418 419
	}

	if rdata != st.DataReceived {
Steven Allen's avatar
Steven Allen committed
420
		t.Errorf("mismatch in data recvd: %d vs %d", rdata, st.DataReceived)
Jeromy's avatar
Jeromy committed
421 422 423
	}
}

Jeromy's avatar
Jeromy committed
424
func TestBasicBitswap(t *testing.T) {
425
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
426 427
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
Jeromy's avatar
Jeromy committed
428 429
	bg := blocksutil.NewBlockGenerator()

430
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
431

432
	instances := ig.Instances(3)
Jeromy's avatar
Jeromy committed
433
	blocks := bg.Blocks(1)
434 435

	// First peer has block
436
	err := instances[0].Exchange.HasBlock(blocks[0])
Jeromy's avatar
Jeromy committed
437 438 439 440
	if err != nil {
		t.Fatal(err)
	}

441
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
rht's avatar
rht committed
442
	defer cancel()
443 444 445

	// Second peer broadcasts want for block CID
	// (Received by first and third peers)
446
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
Jeromy's avatar
Jeromy committed
447 448 449 450
	if err != nil {
		t.Fatal(err)
	}

451 452
	// When second peer receives block, it should send out a cancel, so third
	// peer should no longer keep second peer's want
453 454 455 456 457 458 459 460 461 462
	if err = tu.WaitFor(ctx, func() error {
		if len(instances[2].Exchange.WantlistForPeer(instances[1].Peer)) != 0 {
			return fmt.Errorf("should have no items in other peers wantlist")
		}
		if len(instances[1].Exchange.GetWantlist()) != 0 {
			return fmt.Errorf("shouldnt have anything in wantlist")
		}
		return nil
	}); err != nil {
		t.Fatal(err)
Jeromy's avatar
Jeromy committed
463 464
	}

Jeromy's avatar
Jeromy committed
465 466 467 468 469 470 471 472 473 474
	st0, err := instances[0].Exchange.Stat()
	if err != nil {
		t.Fatal(err)
	}

	st1, err := instances[1].Exchange.Stat()
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
475 476
	st2, err := instances[2].Exchange.Stat()
	if err != nil {
Jeromy's avatar
Jeromy committed
477 478 479
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
480 481 482 483 484 485 486 487 488
	t.Log("stat node 0")
	assertStat(t, st0, 1, 0, uint64(len(blk.RawData())), 0)
	t.Log("stat node 1")
	assertStat(t, st1, 0, 1, 0, uint64(len(blk.RawData())))
	t.Log("stat node 2")
	assertStat(t, st2, 0, 0, 0, 0)

	if !bytes.Equal(blk.RawData(), blocks[0].RawData()) {
		t.Errorf("blocks aren't equal: expected %v, actual %v", blocks[0].RawData(), blk.RawData())
Jeromy's avatar
Jeromy committed
489 490
	}

Jeromy's avatar
Jeromy committed
491 492 493 494 495 496 497 498
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
Jeromy's avatar
Jeromy committed
499 500 501

func TestDoubleGet(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
502 503
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
Jeromy's avatar
Jeromy committed
504 505 506 507
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a one node trying to get one block from another")

508
	instances := ig.Instances(2)
Jeromy's avatar
Jeromy committed
509 510
	blocks := bg.Blocks(1)

Jeromy's avatar
Jeromy committed
511 512 513
	// NOTE: A race condition can happen here where these GetBlocks requests go
	// through before the peers even get connected. This is okay, bitswap
	// *should* be able to handle this.
Jeromy's avatar
Jeromy committed
514
	ctx1, cancel1 := context.WithCancel(context.Background())
515
	blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []cid.Cid{blocks[0].Cid()})
Jeromy's avatar
Jeromy committed
516 517 518 519 520 521 522
	if err != nil {
		t.Fatal(err)
	}

	ctx2, cancel2 := context.WithCancel(context.Background())
	defer cancel2()

523
	blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []cid.Cid{blocks[0].Cid()})
Jeromy's avatar
Jeromy committed
524 525 526 527
	if err != nil {
		t.Fatal(err)
	}

528
	// ensure both requests make it into the wantlist at the same time
Jeromy's avatar
Jeromy committed
529
	time.Sleep(time.Millisecond * 20)
Jeromy's avatar
Jeromy committed
530 531 532 533 534 535 536 537 538 539 540 541
	cancel1()

	_, ok := <-blkch1
	if ok {
		t.Fatal("expected channel to be closed")
	}

	err = instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

542 543 544 545 546 547 548
	select {
	case blk, ok := <-blkch2:
		if !ok {
			t.Fatal("expected to get the block here")
		}
		t.Log(blk)
	case <-time.After(time.Second * 5):
Jeromy's avatar
Jeromy committed
549 550 551 552 553 554 555 556
		p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Peer)
		if len(p1wl) != 1 {
			t.Logf("wantlist view didnt have 1 item (had %d)", len(p1wl))
		} else if !p1wl[0].Equals(blocks[0].Cid()) {
			t.Logf("had 1 item, it was wrong: %s %s", blocks[0].Cid(), p1wl[0])
		} else {
			t.Log("had correct wantlist, somehow")
		}
557
		t.Fatal("timed out waiting on block")
Jeromy's avatar
Jeromy committed
558 559 560 561 562 563 564 565 566
	}

	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
567 568 569

func TestWantlistCleanup(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
570 571
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
572 573
	bg := blocksutil.NewBlockGenerator()

dirkmc's avatar
dirkmc committed
574 575 576
	instances := ig.Instances(2)
	instance := instances[0]
	bswap := instance.Exchange
577 578
	blocks := bg.Blocks(20)

579
	var keys []cid.Cid
580
	for _, b := range blocks {
581
		keys = append(keys, b.Cid())
582 583
	}

dirkmc's avatar
dirkmc committed
584
	// Once context times out, key should be removed from wantlist
585 586 587 588 589 590 591 592 593
	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err := bswap.GetBlock(ctx, keys[0])
	if err != context.DeadlineExceeded {
		t.Fatal("shouldnt have fetched any blocks")
	}

	time.Sleep(time.Millisecond * 50)

dirkmc's avatar
dirkmc committed
594
	if len(bswap.GetWantHaves()) > 0 {
595 596 597
		t.Fatal("should not have anyting in wantlist")
	}

dirkmc's avatar
dirkmc committed
598
	// Once context times out, keys should be removed from wantlist
599 600 601 602 603 604 605 606 607 608
	ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err = bswap.GetBlocks(ctx, keys[:10])
	if err != nil {
		t.Fatal(err)
	}

	<-ctx.Done()
	time.Sleep(time.Millisecond * 50)

dirkmc's avatar
dirkmc committed
609
	if len(bswap.GetWantHaves()) > 0 {
610 611 612
		t.Fatal("should not have anyting in wantlist")
	}

dirkmc's avatar
dirkmc committed
613
	// Send want for single block, with no timeout
614 615 616 617 618
	_, err = bswap.GetBlocks(context.Background(), keys[:1])
	if err != nil {
		t.Fatal(err)
	}

dirkmc's avatar
dirkmc committed
619
	// Send want for 10 blocks
620 621 622 623 624 625
	ctx, cancel = context.WithCancel(context.Background())
	_, err = bswap.GetBlocks(ctx, keys[10:])
	if err != nil {
		t.Fatal(err)
	}

dirkmc's avatar
dirkmc committed
626 627
	// Even after 50 milli-seconds we haven't explicitly cancelled anything
	// and no timeouts have expired, so we should have 11 want-haves
628
	time.Sleep(time.Millisecond * 50)
dirkmc's avatar
dirkmc committed
629 630
	if len(bswap.GetWantHaves()) != 11 {
		t.Fatal("should have 11 keys in wantlist")
631 632
	}

dirkmc's avatar
dirkmc committed
633 634
	// Cancel the timeout for the request for 10 blocks. This should remove
	// the want-haves
635
	cancel()
dirkmc's avatar
dirkmc committed
636 637

	// Once the cancel is processed, we are left with the request for 1 block
638
	time.Sleep(time.Millisecond * 50)
dirkmc's avatar
dirkmc committed
639
	if !(len(bswap.GetWantHaves()) == 1 && bswap.GetWantHaves()[0] == keys[0]) {
640 641 642
		t.Fatal("should only have keys[0] in wantlist")
	}
}
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659

func assertLedgerMatch(ra, rb *decision.Receipt) error {
	if ra.Sent != rb.Recv {
		return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d sent vs %d recvd", ra.Sent, rb.Recv)
	}

	if ra.Recv != rb.Sent {
		return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d recvd vs %d sent", ra.Recv, rb.Sent)
	}

	if ra.Exchanged != rb.Exchanged {
		return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged)
	}

	return nil
}

660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690
func assertLedgerEqual(ra, rb *decision.Receipt) error {
	if ra.Value != rb.Value {
		return fmt.Errorf("mismatch in ledgers (value/debt ratio): %f vs %f ", ra.Value, rb.Value)
	}

	if ra.Sent != rb.Sent {
		return fmt.Errorf("mismatch in ledgers (sent bytes): %d vs %d", ra.Sent, rb.Sent)
	}

	if ra.Recv != rb.Recv {
		return fmt.Errorf("mismatch in ledgers (recvd bytes): %d vs %d", ra.Recv, rb.Recv)
	}

	if ra.Exchanged != rb.Exchanged {
		return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged)
	}

	return nil
}

func newReceipt(sent, recv, exchanged uint64) *decision.Receipt {
	return &decision.Receipt{
		Peer:      "test",
		Value:     float64(sent) / (1 + float64(recv)),
		Sent:      sent,
		Recv:      recv,
		Exchanged: exchanged,
	}
}

func TestBitswapLedgerOneWay(t *testing.T) {
691
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
692 693
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
694 695 696 697
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test ledgers match when one peer sends block to another")

698
	instances := ig.Instances(2)
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
	if err != nil {
		t.Fatal(err)
	}

	ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer)
	rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer)

715
	// compare peer ledger receipts
716 717 718 719 720
	err = assertLedgerMatch(ra, rb)
	if err != nil {
		t.Fatal(err)
	}

721 722 723 724 725 726 727 728 729 730 731 732
	// check that receipts have intended values
	ratest := newReceipt(1, 0, 1)
	err = assertLedgerEqual(ratest, ra)
	if err != nil {
		t.Fatal(err)
	}
	rbtest := newReceipt(0, 1, 1)
	err = assertLedgerEqual(rbtest, rb)
	if err != nil {
		t.Fatal(err)
	}

733 734 735 736 737 738 739 740 741
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}

742
func TestBitswapLedgerTwoWay(t *testing.T) {
743
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
744 745
	ig := testinstance.NewTestInstanceGenerator(net)
	defer ig.Close()
746 747 748 749
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test ledgers match when two peers send one block to each other")

750
	instances := ig.Instances(2)
751 752 753 754 755 756 757 758 759 760 761 762 763
	blocks := bg.Blocks(2)
	err := instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	err = instances[1].Exchange.HasBlock(blocks[1])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
764
	_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
765 766 767 768 769 770
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
771
	blk, err := instances[0].Exchange.GetBlock(ctx, blocks[1].Cid())
772 773 774 775 776 777 778
	if err != nil {
		t.Fatal(err)
	}

	ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer)
	rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer)

779
	// compare peer ledger receipts
780 781 782 783 784
	err = assertLedgerMatch(ra, rb)
	if err != nil {
		t.Fatal(err)
	}

785 786 787 788 789 790 791 792 793 794 795 796
	// check that receipts have intended values
	rtest := newReceipt(1, 1, 2)
	err = assertLedgerEqual(rtest, ra)
	if err != nil {
		t.Fatal(err)
	}

	err = assertLedgerEqual(rtest, rb)
	if err != nil {
		t.Fatal(err)
	}

797 798 799 800 801 802 803 804
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}