bitswap_test.go 26.2 KB
Newer Older
1
package bitswap_test
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2 3

import (
4
	"bytes"
5
	"context"
Jeromy's avatar
Jeromy committed
6
	"fmt"
7
	"sync"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
8 9 10
	"testing"
	"time"

11
	bitswap "github.com/ipfs/go-bitswap"
12
	deciface "github.com/ipfs/go-bitswap/decision"
13 14
	decision "github.com/ipfs/go-bitswap/internal/decision"
	bssession "github.com/ipfs/go-bitswap/internal/session"
Tomasz Zdybał's avatar
Tomasz Zdybał committed
15 16
	bsmsg "github.com/ipfs/go-bitswap/message"
	pb "github.com/ipfs/go-bitswap/message/pb"
17 18
	testinstance "github.com/ipfs/go-bitswap/testinstance"
	tn "github.com/ipfs/go-bitswap/testnet"
Jeromy's avatar
Jeromy committed
19 20 21 22 23 24 25
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	detectrace "github.com/ipfs/go-detect-race"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
	delay "github.com/ipfs/go-ipfs-delay"
	mockrouting "github.com/ipfs/go-ipfs-routing/mock"
26
	peer "github.com/libp2p/go-libp2p-core/peer"
Jeromy's avatar
Jeromy committed
27
	p2ptestutil "github.com/libp2p/go-libp2p-netutil"
28 29
	travis "github.com/libp2p/go-libp2p-testing/ci/travis"
	tu "github.com/libp2p/go-libp2p-testing/etc"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30 31
)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
32 33
// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
34 35
const kNetworkDelay = 0 * time.Millisecond

36 37 38 39
func getVirtualNetwork() tn.Network {
	return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

40
func TestClose(t *testing.T) {
41
	vnet := getVirtualNetwork()
42
	ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
43
	defer ig.Close()
44
	bgen := blocksutil.NewBlockGenerator()
45 46

	block := bgen.Next()
47
	bitswap := ig.Next()
48

49
	bitswap.Exchange.Close()
Steven Allen's avatar
Steven Allen committed
50 51 52 53
	_, err := bitswap.Exchange.GetBlock(context.Background(), block.Cid())
	if err == nil {
		t.Fatal("expected GetBlock to fail")
	}
54 55
}

Jeromy's avatar
Jeromy committed
56 57 58 59
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this

	rs := mockrouting.NewServer()
	net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
60
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
61
	defer ig.Close()
Jeromy's avatar
Jeromy committed
62 63 64

	block := blocks.NewBlock([]byte("block"))
	pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
Steven Allen's avatar
Steven Allen committed
65 66 67 68
	err := rs.Client(pinfo).Provide(context.Background(), block.Cid(), true) // but not on network
	if err != nil {
		t.Fatal(err)
	}
Jeromy's avatar
Jeromy committed
69

70
	solo := ig.Next()
Jeromy's avatar
Jeromy committed
71 72
	defer solo.Exchange.Close()

rht's avatar
rht committed
73 74
	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
	defer cancel()
Steven Allen's avatar
Steven Allen committed
75
	_, err = solo.Exchange.GetBlock(ctx, block.Cid())
Jeromy's avatar
Jeromy committed
76 77 78 79 80 81

	if err != context.DeadlineExceeded {
		t.Fatal("Expected DeadlineExceeded error")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
82 83
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {

84
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
85
	block := blocks.NewBlock([]byte("block"))
86
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
87
	defer ig.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
88

89
	peers := ig.Instances(2)
90
	hasBlock := peers[0]
Jeromy's avatar
Jeromy committed
91
	defer hasBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
92

93
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
94 95
		t.Fatal(err)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
96

97
	wantsBlock := peers[1]
Jeromy's avatar
Jeromy committed
98
	defer wantsBlock.Exchange.Close()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
99

rht's avatar
rht committed
100 101
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
102
	received, err := wantsBlock.Exchange.GetBlock(ctx, block.Cid())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
103 104 105 106
	if err != nil {
		t.Log(err)
		t.Fatal("Expected to succeed")
	}
107

Jeromy's avatar
Jeromy committed
108
	if !bytes.Equal(block.RawData(), received.RawData()) {
109 110
		t.Fatal("Data doesn't match")
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
111 112
}

113 114 115
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	block := blocks.NewBlock([]byte("block"))
116 117
	bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)}
	ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
118
	defer ig.Close()
119

120
	hasBlock := ig.Next()
121 122
	defer hasBlock.Exchange.Close()

123 124 125
	wantsBlock := ig.Next()
	defer wantsBlock.Exchange.Close()

126 127 128 129
	if err := hasBlock.Exchange.HasBlock(block); err != nil {
		t.Fatal(err)
	}

130
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond)
131 132 133 134 135 136 137 138 139 140 141 142 143 144
	defer cancel()

	ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session)

	received, err := ns.GetBlock(ctx, block.Cid())
	if received != nil {
		t.Fatalf("Expected to find nothing, found %s", received)
	}

	if err != context.DeadlineExceeded {
		t.Fatal("Expected deadline exceeded")
	}
}

145 146
// Tests that a received block is not stored in the blockstore if the block was
// not requested by the client
147 148 149 150
func TestUnwantedBlockNotAdded(t *testing.T) {

	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	block := blocks.NewBlock([]byte("block"))
Cory Schwartz's avatar
Cory Schwartz committed
151
	bsMessage := bsmsg.New(true)
152 153
	bsMessage.AddBlock(block)

154
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
155
	defer ig.Close()
156

157
	peers := ig.Instances(2)
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
	hasBlock := peers[0]
	defer hasBlock.Exchange.Close()

	if err := hasBlock.Exchange.HasBlock(block); err != nil {
		t.Fatal(err)
	}

	doesNotWantBlock := peers[1]
	defer doesNotWantBlock.Exchange.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Peer, bsMessage)

173
	blockInStore, err := doesNotWantBlock.Blockstore().Has(block.Cid())
174 175 176 177 178
	if err != nil || blockInStore {
		t.Fatal("Unwanted block added to block store")
	}
}

179 180 181 182 183 184 185 186 187 188 189
// Tests that a received block is returned to the client and stored in the
// blockstore in the following scenario:
// - the want for the block has been requested by the client
// - the want for the block has not yet been sent out to a peer
//   (because the live request queue is full)
func TestPendingBlockAdded(t *testing.T) {
	ctx := context.Background()
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	bg := blocksutil.NewBlockGenerator()
	sessionBroadcastWantCapacity := 4

190
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
	defer ig.Close()

	instance := ig.Instances(1)[0]
	defer instance.Exchange.Close()

	oneSecCtx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	// Request enough blocks to exceed the session's broadcast want list
	// capacity (by one block). The session will put the remaining block
	// into the "tofetch" queue
	blks := bg.Blocks(sessionBroadcastWantCapacity + 1)
	ks := make([]cid.Cid, 0, len(blks))
	for _, b := range blks {
		ks = append(ks, b.Cid())
	}
	outch, err := instance.Exchange.GetBlocks(ctx, ks)
	if err != nil {
		t.Fatal(err)
	}

	// Wait a little while to make sure the session has time to process the wants
	time.Sleep(time.Millisecond * 20)

	// Simulate receiving a message which contains the block in the "tofetch" queue
	lastBlock := blks[len(blks)-1]
Cory Schwartz's avatar
Cory Schwartz committed
217
	bsMessage := bsmsg.New(true)
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
	bsMessage.AddBlock(lastBlock)
	unknownPeer := peer.ID("QmUHfvCQrzyR6vFXmeyCptfCWedfcmfa12V6UuziDtrw23")
	instance.Exchange.ReceiveMessage(oneSecCtx, unknownPeer, bsMessage)

	// Make sure Bitswap adds the block to the output channel
	blkrecvd, ok := <-outch
	if !ok {
		t.Fatal("timed out waiting for block")
	}
	if !blkrecvd.Cid().Equals(lastBlock.Cid()) {
		t.Fatal("received wrong block")
	}

	// Make sure Bitswap adds the block to the blockstore
	blockInStore, err := instance.Blockstore().Has(lastBlock.Cid())
	if err != nil {
		t.Fatal(err)
	}
	if !blockInStore {
		t.Fatal("Block was not added to block store")
	}
}

241
func TestLargeSwarm(t *testing.T) {
242 243 244
	if testing.Short() {
		t.SkipNow()
	}
245
	numInstances := 100
246
	numBlocks := 2
247 248 249
	if detectrace.WithRace() {
		// when running with the race detector, 500 instances launches
		// well over 8k goroutines. This hits a race detector limit.
250
		numInstances = 20
251 252
	} else if travis.IsRunning() {
		numInstances = 200
253 254 255
	} else {
		t.Parallel()
	}
256 257
	PerformDistributionTest(t, numInstances, numBlocks)
}
258

259 260 261
func TestLargeFile(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
262
	}
263 264 265 266 267

	if !travis.IsRunning() {
		t.Parallel()
	}

268 269 270
	numInstances := 10
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
271 272
}

273 274 275 276 277 278 279 280 281
func TestLargeFileTwoPeers(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}
	numInstances := 2
	numBlocks := 100
	PerformDistributionTest(t, numInstances, numBlocks)
}

282
func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
283
	ctx := context.Background()
284 285 286
	if testing.Short() {
		t.SkipNow()
	}
287
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
288
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
289
	defer ig.Close()
290
	bg := blocksutil.NewBlockGenerator()
291

292
	instances := ig.Instances(numInstances)
293 294 295 296
	blocks := bg.Blocks(numBlocks)

	t.Log("Give the blocks to the first instance")

297
	var blkeys []cid.Cid
298 299
	first := instances[0]
	for _, b := range blocks {
300
		blkeys = append(blkeys, b.Cid())
Steven Allen's avatar
Steven Allen committed
301 302 303 304
		err := first.Exchange.HasBlock(b)
		if err != nil {
			t.Fatal(err)
		}
305 306 307 308
	}

	t.Log("Distribute!")

Jeromy's avatar
Jeromy committed
309
	wg := sync.WaitGroup{}
310 311
	errs := make(chan error)

312
	for _, inst := range instances[1:] {
Jeromy's avatar
Jeromy committed
313
		wg.Add(1)
314
		go func(inst testinstance.Instance) {
Jeromy's avatar
Jeromy committed
315
			defer wg.Done()
316
			outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
Jeromy's avatar
Jeromy committed
317
			if err != nil {
318
				errs <- err
Jeromy's avatar
Jeromy committed
319
			}
320
			for range outch {
Jeromy's avatar
Jeromy committed
321 322
			}
		}(inst)
323
	}
324 325 326 327 328 329 330 331 332 333 334

	go func() {
		wg.Wait()
		close(errs)
	}()

	for err := range errs {
		if err != nil {
			t.Fatal(err)
		}
	}
335 336 337 338 339

	t.Log("Verify!")

	for _, inst := range instances {
		for _, b := range blocks {
340
			if _, err := inst.Blockstore().Get(b.Cid()); err != nil {
341 342 343 344 345 346
				t.Fatal(err)
			}
		}
	}
}

347
// TODO simplify this test. get to the _essence_!
348
func TestSendToWantingPeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
349 350 351 352
	if testing.Short() {
		t.SkipNow()
	}

353
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
354
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
355
	defer ig.Close()
356
	bg := blocksutil.NewBlockGenerator()
357

358
	peers := ig.Instances(2)
359 360
	peerA := peers[0]
	peerB := peers[1]
361

362 363
	t.Logf("Session %v\n", peerA.Peer)
	t.Logf("Session %v\n", peerB.Peer)
364

365
	waitTime := time.Second * 5
366

367 368
	alpha := bg.Next()
	// peerA requests and waits for block alpha
369
	ctx, cancel := context.WithTimeout(context.Background(), waitTime)
rht's avatar
rht committed
370
	defer cancel()
371
	alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []cid.Cid{alpha.Cid()})
372
	if err != nil {
373 374
		t.Fatal(err)
	}
375

376
	// peerB announces to the network that he has block alpha
377
	err = peerB.Exchange.HasBlock(alpha)
378
	if err != nil {
379 380
		t.Fatal(err)
	}
381

382 383 384 385
	// At some point, peerA should get alpha (or timeout)
	blkrecvd, ok := <-alphaPromise
	if !ok {
		t.Fatal("context timed out and broke promise channel!")
386
	}
387

388
	if !blkrecvd.Cid().Equals(alpha.Cid()) {
389
		t.Fatal("Wrong block!")
390
	}
391

392
}
Jeromy's avatar
Jeromy committed
393

jbenet's avatar
jbenet committed
394 395
func TestEmptyKey(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
396
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
397 398
	defer ig.Close()
	bs := ig.Instances(1)[0].Exchange
jbenet's avatar
jbenet committed
399

Jeromy's avatar
Jeromy committed
400 401 402
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

403
	_, err := bs.GetBlock(ctx, cid.Cid{})
jbenet's avatar
jbenet committed
404 405 406 407 408
	if err != blockstore.ErrNotFound {
		t.Error("empty str key should return ErrNotFound")
	}
}

409
func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint64) {
Jeromy's avatar
Jeromy committed
410
	if sblks != st.BlocksSent {
Steven Allen's avatar
Steven Allen committed
411
		t.Errorf("mismatch in blocks sent: %d vs %d", sblks, st.BlocksSent)
Jeromy's avatar
Jeromy committed
412 413 414
	}

	if rblks != st.BlocksReceived {
Steven Allen's avatar
Steven Allen committed
415
		t.Errorf("mismatch in blocks recvd: %d vs %d", rblks, st.BlocksReceived)
Jeromy's avatar
Jeromy committed
416 417 418
	}

	if sdata != st.DataSent {
Steven Allen's avatar
Steven Allen committed
419
		t.Errorf("mismatch in data sent: %d vs %d", sdata, st.DataSent)
Jeromy's avatar
Jeromy committed
420 421 422
	}

	if rdata != st.DataReceived {
Steven Allen's avatar
Steven Allen committed
423
		t.Errorf("mismatch in data recvd: %d vs %d", rdata, st.DataReceived)
Jeromy's avatar
Jeromy committed
424 425 426
	}
}

Jeromy's avatar
Jeromy committed
427
func TestBasicBitswap(t *testing.T) {
428
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
429
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
430
	defer ig.Close()
Jeromy's avatar
Jeromy committed
431 432
	bg := blocksutil.NewBlockGenerator()

433
	t.Log("Test a one node trying to get one block from another")
Jeromy's avatar
Jeromy committed
434

435
	instances := ig.Instances(3)
Jeromy's avatar
Jeromy committed
436
	blocks := bg.Blocks(1)
437 438

	// First peer has block
439
	err := instances[0].Exchange.HasBlock(blocks[0])
Jeromy's avatar
Jeromy committed
440 441 442 443
	if err != nil {
		t.Fatal(err)
	}

444
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
rht's avatar
rht committed
445
	defer cancel()
446 447 448

	// Second peer broadcasts want for block CID
	// (Received by first and third peers)
449
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
Jeromy's avatar
Jeromy committed
450 451 452 453
	if err != nil {
		t.Fatal(err)
	}

454 455
	// When second peer receives block, it should send out a cancel, so third
	// peer should no longer keep second peer's want
456 457 458 459 460 461 462 463 464 465
	if err = tu.WaitFor(ctx, func() error {
		if len(instances[2].Exchange.WantlistForPeer(instances[1].Peer)) != 0 {
			return fmt.Errorf("should have no items in other peers wantlist")
		}
		if len(instances[1].Exchange.GetWantlist()) != 0 {
			return fmt.Errorf("shouldnt have anything in wantlist")
		}
		return nil
	}); err != nil {
		t.Fatal(err)
Jeromy's avatar
Jeromy committed
466 467
	}

Jeromy's avatar
Jeromy committed
468 469 470 471 472 473 474 475 476
	st0, err := instances[0].Exchange.Stat()
	if err != nil {
		t.Fatal(err)
	}
	st1, err := instances[1].Exchange.Stat()
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
477 478
	st2, err := instances[2].Exchange.Stat()
	if err != nil {
Jeromy's avatar
Jeromy committed
479 480 481
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
482 483 484 485 486 487 488 489 490
	t.Log("stat node 0")
	assertStat(t, st0, 1, 0, uint64(len(blk.RawData())), 0)
	t.Log("stat node 1")
	assertStat(t, st1, 0, 1, 0, uint64(len(blk.RawData())))
	t.Log("stat node 2")
	assertStat(t, st2, 0, 0, 0, 0)

	if !bytes.Equal(blk.RawData(), blocks[0].RawData()) {
		t.Errorf("blocks aren't equal: expected %v, actual %v", blocks[0].RawData(), blk.RawData())
Jeromy's avatar
Jeromy committed
491 492
	}

Jeromy's avatar
Jeromy committed
493 494 495 496 497 498 499 500
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
Jeromy's avatar
Jeromy committed
501 502 503

func TestDoubleGet(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
504
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
505
	defer ig.Close()
Jeromy's avatar
Jeromy committed
506 507 508 509
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test a one node trying to get one block from another")

510
	instances := ig.Instances(2)
Jeromy's avatar
Jeromy committed
511 512
	blocks := bg.Blocks(1)

Jeromy's avatar
Jeromy committed
513 514 515
	// NOTE: A race condition can happen here where these GetBlocks requests go
	// through before the peers even get connected. This is okay, bitswap
	// *should* be able to handle this.
Jeromy's avatar
Jeromy committed
516
	ctx1, cancel1 := context.WithCancel(context.Background())
517
	blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []cid.Cid{blocks[0].Cid()})
Jeromy's avatar
Jeromy committed
518 519 520 521 522 523 524
	if err != nil {
		t.Fatal(err)
	}

	ctx2, cancel2 := context.WithCancel(context.Background())
	defer cancel2()

525
	blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []cid.Cid{blocks[0].Cid()})
Jeromy's avatar
Jeromy committed
526 527 528 529
	if err != nil {
		t.Fatal(err)
	}

530
	// ensure both requests make it into the wantlist at the same time
Jeromy's avatar
Jeromy committed
531
	time.Sleep(time.Millisecond * 20)
Jeromy's avatar
Jeromy committed
532 533 534 535 536 537 538 539 540 541 542 543
	cancel1()

	_, ok := <-blkch1
	if ok {
		t.Fatal("expected channel to be closed")
	}

	err = instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

544 545 546 547 548 549 550
	select {
	case blk, ok := <-blkch2:
		if !ok {
			t.Fatal("expected to get the block here")
		}
		t.Log(blk)
	case <-time.After(time.Second * 5):
Jeromy's avatar
Jeromy committed
551 552 553 554 555 556 557 558
		p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Peer)
		if len(p1wl) != 1 {
			t.Logf("wantlist view didnt have 1 item (had %d)", len(p1wl))
		} else if !p1wl[0].Equals(blocks[0].Cid()) {
			t.Logf("had 1 item, it was wrong: %s %s", blocks[0].Cid(), p1wl[0])
		} else {
			t.Log("had correct wantlist, somehow")
		}
559
		t.Fatal("timed out waiting on block")
Jeromy's avatar
Jeromy committed
560 561 562 563 564 565 566 567 568
	}

	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
569 570 571

func TestWantlistCleanup(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
572
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
573
	defer ig.Close()
574 575
	bg := blocksutil.NewBlockGenerator()

dirkmc's avatar
dirkmc committed
576 577 578
	instances := ig.Instances(2)
	instance := instances[0]
	bswap := instance.Exchange
579 580
	blocks := bg.Blocks(20)

581
	var keys []cid.Cid
582
	for _, b := range blocks {
583
		keys = append(keys, b.Cid())
584 585
	}

dirkmc's avatar
dirkmc committed
586
	// Once context times out, key should be removed from wantlist
587 588 589 590 591 592 593 594 595
	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err := bswap.GetBlock(ctx, keys[0])
	if err != context.DeadlineExceeded {
		t.Fatal("shouldnt have fetched any blocks")
	}

	time.Sleep(time.Millisecond * 50)

dirkmc's avatar
dirkmc committed
596
	if len(bswap.GetWantHaves()) > 0 {
597 598 599
		t.Fatal("should not have anyting in wantlist")
	}

dirkmc's avatar
dirkmc committed
600
	// Once context times out, keys should be removed from wantlist
601 602 603 604 605 606 607 608 609 610
	ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50)
	defer cancel()
	_, err = bswap.GetBlocks(ctx, keys[:10])
	if err != nil {
		t.Fatal(err)
	}

	<-ctx.Done()
	time.Sleep(time.Millisecond * 50)

dirkmc's avatar
dirkmc committed
611
	if len(bswap.GetWantHaves()) > 0 {
612 613 614
		t.Fatal("should not have anyting in wantlist")
	}

dirkmc's avatar
dirkmc committed
615
	// Send want for single block, with no timeout
616 617 618 619 620
	_, err = bswap.GetBlocks(context.Background(), keys[:1])
	if err != nil {
		t.Fatal(err)
	}

dirkmc's avatar
dirkmc committed
621
	// Send want for 10 blocks
622 623 624 625 626 627
	ctx, cancel = context.WithCancel(context.Background())
	_, err = bswap.GetBlocks(ctx, keys[10:])
	if err != nil {
		t.Fatal(err)
	}

dirkmc's avatar
dirkmc committed
628 629
	// Even after 50 milli-seconds we haven't explicitly cancelled anything
	// and no timeouts have expired, so we should have 11 want-haves
630
	time.Sleep(time.Millisecond * 50)
dirkmc's avatar
dirkmc committed
631 632
	if len(bswap.GetWantHaves()) != 11 {
		t.Fatal("should have 11 keys in wantlist")
633 634
	}

dirkmc's avatar
dirkmc committed
635 636
	// Cancel the timeout for the request for 10 blocks. This should remove
	// the want-haves
637
	cancel()
dirkmc's avatar
dirkmc committed
638 639

	// Once the cancel is processed, we are left with the request for 1 block
640
	time.Sleep(time.Millisecond * 50)
dirkmc's avatar
dirkmc committed
641
	if !(len(bswap.GetWantHaves()) == 1 && bswap.GetWantHaves()[0] == keys[0]) {
642 643 644
		t.Fatal("should only have keys[0] in wantlist")
	}
}
645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661

func assertLedgerMatch(ra, rb *decision.Receipt) error {
	if ra.Sent != rb.Recv {
		return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d sent vs %d recvd", ra.Sent, rb.Recv)
	}

	if ra.Recv != rb.Sent {
		return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d recvd vs %d sent", ra.Recv, rb.Sent)
	}

	if ra.Exchanged != rb.Exchanged {
		return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged)
	}

	return nil
}

662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
func assertLedgerEqual(ra, rb *decision.Receipt) error {
	if ra.Value != rb.Value {
		return fmt.Errorf("mismatch in ledgers (value/debt ratio): %f vs %f ", ra.Value, rb.Value)
	}

	if ra.Sent != rb.Sent {
		return fmt.Errorf("mismatch in ledgers (sent bytes): %d vs %d", ra.Sent, rb.Sent)
	}

	if ra.Recv != rb.Recv {
		return fmt.Errorf("mismatch in ledgers (recvd bytes): %d vs %d", ra.Recv, rb.Recv)
	}

	if ra.Exchanged != rb.Exchanged {
		return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged)
	}

	return nil
}

func newReceipt(sent, recv, exchanged uint64) *decision.Receipt {
	return &decision.Receipt{
		Peer:      "test",
		Value:     float64(sent) / (1 + float64(recv)),
		Sent:      sent,
		Recv:      recv,
		Exchanged: exchanged,
	}
}

func TestBitswapLedgerOneWay(t *testing.T) {
693
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
694
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
695
	defer ig.Close()
696 697 698 699
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test ledgers match when one peer sends block to another")

700
	instances := ig.Instances(2)
701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716
	blocks := bg.Blocks(1)
	err := instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
	if err != nil {
		t.Fatal(err)
	}

	ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer)
	rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer)

717
	// compare peer ledger receipts
718 719 720 721 722
	err = assertLedgerMatch(ra, rb)
	if err != nil {
		t.Fatal(err)
	}

723 724 725 726 727 728 729 730 731 732 733 734
	// check that receipts have intended values
	ratest := newReceipt(1, 0, 1)
	err = assertLedgerEqual(ratest, ra)
	if err != nil {
		t.Fatal(err)
	}
	rbtest := newReceipt(0, 1, 1)
	err = assertLedgerEqual(rbtest, rb)
	if err != nil {
		t.Fatal(err)
	}

735 736 737 738 739 740 741 742 743
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}

744
func TestBitswapLedgerTwoWay(t *testing.T) {
745
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
746
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
747
	defer ig.Close()
748 749 750 751
	bg := blocksutil.NewBlockGenerator()

	t.Log("Test ledgers match when two peers send one block to each other")

752
	instances := ig.Instances(2)
753 754 755 756 757 758 759 760 761 762 763 764 765
	blocks := bg.Blocks(2)
	err := instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	err = instances[1].Exchange.HasBlock(blocks[1])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
766
	_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
767 768 769 770 771 772
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
773
	blk, err := instances[0].Exchange.GetBlock(ctx, blocks[1].Cid())
774 775 776 777 778 779 780
	if err != nil {
		t.Fatal(err)
	}

	ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer)
	rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer)

781
	// compare peer ledger receipts
782 783 784 785 786
	err = assertLedgerMatch(ra, rb)
	if err != nil {
		t.Fatal(err)
	}

787 788 789 790 791 792 793 794 795 796 797 798
	// check that receipts have intended values
	rtest := newReceipt(1, 1, 2)
	err = assertLedgerEqual(rtest, ra)
	if err != nil {
		t.Fatal(err)
	}

	err = assertLedgerEqual(rtest, rb)
	if err != nil {
		t.Fatal(err)
	}

799 800 801 802 803 804 805 806
	t.Log(blk)
	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862

type testingScoreLedger struct {
	scorePeer deciface.ScorePeerFunc
	started   chan struct{}
	closed    chan struct{}
}

func newTestingScoreLedger() *testingScoreLedger {
	return &testingScoreLedger{
		nil,
		make(chan struct{}),
		make(chan struct{}),
	}
}

func (tsl *testingScoreLedger) GetReceipt(p peer.ID) *deciface.Receipt {
	return nil
}
func (tsl *testingScoreLedger) AddToSentBytes(p peer.ID, n int)     {}
func (tsl *testingScoreLedger) AddToReceivedBytes(p peer.ID, n int) {}
func (tsl *testingScoreLedger) PeerConnected(p peer.ID)             {}
func (tsl *testingScoreLedger) PeerDisconnected(p peer.ID)          {}
func (tsl *testingScoreLedger) Start(scorePeer deciface.ScorePeerFunc) {
	tsl.scorePeer = scorePeer
	close(tsl.started)
}
func (tsl *testingScoreLedger) Stop() {
	close(tsl.closed)
}

// Tests start and stop of a custom decision logic
func TestWithScoreLedger(t *testing.T) {
	tsl := newTestingScoreLedger()
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	bsOpts := []bitswap.Option{bitswap.WithScoreLedger(tsl)}
	ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
	defer ig.Close()
	i := ig.Next()
	defer i.Exchange.Close()

	select {
	case <-tsl.started:
		if tsl.scorePeer == nil {
			t.Fatal("Expected the score function to be initialized")
		}
	case <-time.After(time.Second * 5):
		t.Fatal("Expected the score ledger to be started within 5s")
	}

	i.Exchange.Close()
	select {
	case <-tsl.closed:
	case <-time.After(time.Second * 5):
		t.Fatal("Expected the score ledger to be closed within 5s")
	}
}
Tomasz Zdybał's avatar
Tomasz Zdybał committed
863 864 865 866 867 868 869

type logItem struct {
	dir byte
	pid peer.ID
	msg bsmsg.BitSwapMessage
}
type mockWireTap struct {
870
	mu  sync.Mutex
Tomasz Zdybał's avatar
Tomasz Zdybał committed
871 872 873 874
	log []logItem
}

func (m *mockWireTap) MessageReceived(p peer.ID, msg bsmsg.BitSwapMessage) {
875 876
	m.mu.Lock()
	defer m.mu.Unlock()
Tomasz Zdybał's avatar
Tomasz Zdybał committed
877 878 879
	m.log = append(m.log, logItem{'r', p, msg})
}
func (m *mockWireTap) MessageSent(p peer.ID, msg bsmsg.BitSwapMessage) {
880 881
	m.mu.Lock()
	defer m.mu.Unlock()
Tomasz Zdybał's avatar
Tomasz Zdybał committed
882 883 884
	m.log = append(m.log, logItem{'s', p, msg})
}

885 886 887 888 889 890
func (m *mockWireTap) getLog() []logItem {
	m.mu.Lock()
	defer m.mu.Unlock()
	return m.log[:len(m.log):len(m.log)]
}

Tomasz Zdybał's avatar
Tomasz Zdybał committed
891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933
func TestWireTap(t *testing.T) {
	net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
	ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
	defer ig.Close()
	bg := blocksutil.NewBlockGenerator()

	instances := ig.Instances(3)
	blocks := bg.Blocks(2)

	// Install WireTap
	wiretap := new(mockWireTap)
	bitswap.EnableWireTap(wiretap)(instances[0].Exchange)

	// First peer has block
	err := instances[0].Exchange.HasBlock(blocks[0])
	if err != nil {
		t.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	// Second peer broadcasts want for block CID
	// (Received by first and third peers)
	_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
	if err != nil {
		t.Fatal(err)
	}

	// When second peer receives block, it should send out a cancel, so third
	// peer should no longer keep second peer's want
	if err = tu.WaitFor(ctx, func() error {
		if len(instances[2].Exchange.WantlistForPeer(instances[1].Peer)) != 0 {
			return fmt.Errorf("should have no items in other peers wantlist")
		}
		if len(instances[1].Exchange.GetWantlist()) != 0 {
			return fmt.Errorf("shouldnt have anything in wantlist")
		}
		return nil
	}); err != nil {
		t.Fatal(err)
	}

934 935
	log := wiretap.getLog()

Tomasz Zdybał's avatar
Tomasz Zdybał committed
936
	// After communication, 3 messages should be logged via WireTap
937
	if l := len(log); l != 3 {
Tomasz Zdybał's avatar
Tomasz Zdybał committed
938 939 940 941
		t.Fatal("expected 3 items logged via WireTap, found", l)
	}

	// Received: 'Have'
942
	if log[0].dir != 'r' {
Tomasz Zdybał's avatar
Tomasz Zdybał committed
943 944
		t.Error("expected message to be received")
	}
945 946
	if log[0].pid != instances[1].Peer {
		t.Error("expected peer", instances[1].Peer, ", found", log[0].pid)
Tomasz Zdybał's avatar
Tomasz Zdybał committed
947
	}
948
	if l := len(log[0].msg.Wantlist()); l != 1 {
Tomasz Zdybał's avatar
Tomasz Zdybał committed
949 950
		t.Fatal("expected 1 entry in Wantlist, found", l)
	}
951
	if log[0].msg.Wantlist()[0].WantType != pb.Message_Wantlist_Have {
Tomasz Zdybał's avatar
Tomasz Zdybał committed
952 953 954 955
		t.Error("expected WantType equal to 'Have', found 'Block'")
	}

	// Sent: Block
956
	if log[1].dir != 's' {
Tomasz Zdybał's avatar
Tomasz Zdybał committed
957 958
		t.Error("expected message to be sent")
	}
959 960
	if log[1].pid != instances[1].Peer {
		t.Error("expected peer", instances[1].Peer, ", found", log[1].pid)
Tomasz Zdybał's avatar
Tomasz Zdybał committed
961
	}
962
	if l := len(log[1].msg.Blocks()); l != 1 {
Tomasz Zdybał's avatar
Tomasz Zdybał committed
963 964
		t.Fatal("expected 1 entry in Blocks, found", l)
	}
965
	if log[1].msg.Blocks()[0].Cid() != blocks[0].Cid() {
Tomasz Zdybał's avatar
Tomasz Zdybał committed
966 967 968 969
		t.Error("wrong block Cid")
	}

	// Received: 'Cancel'
970
	if log[2].dir != 'r' {
Tomasz Zdybał's avatar
Tomasz Zdybał committed
971 972
		t.Error("expected message to be received")
	}
973 974
	if log[2].pid != instances[1].Peer {
		t.Error("expected peer", instances[1].Peer, ", found", log[2].pid)
Tomasz Zdybał's avatar
Tomasz Zdybał committed
975
	}
976
	if l := len(log[2].msg.Wantlist()); l != 1 {
Tomasz Zdybał's avatar
Tomasz Zdybał committed
977 978
		t.Fatal("expected 1 entry in Wantlist, found", l)
	}
979
	if log[2].msg.Wantlist()[0].WantType != pb.Message_Wantlist_Block {
Tomasz Zdybał's avatar
Tomasz Zdybał committed
980 981
		t.Error("expected WantType equal to 'Block', found 'Have'")
	}
982
	if log[2].msg.Wantlist()[0].Cancel != true {
Tomasz Zdybał's avatar
Tomasz Zdybał committed
983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005
		t.Error("expected entry with Cancel set to 'true'")
	}

	// After disabling WireTap, no new messages are logged
	bitswap.DisableWireTap()(instances[0].Exchange)

	err = instances[0].Exchange.HasBlock(blocks[1])
	if err != nil {
		t.Fatal(err)
	}
	_, err = instances[1].Exchange.GetBlock(ctx, blocks[1].Cid())
	if err != nil {
		t.Fatal(err)
	}
	if err = tu.WaitFor(ctx, func() error {
		if len(instances[1].Exchange.GetWantlist()) != 0 {
			return fmt.Errorf("shouldnt have anything in wantlist")
		}
		return nil
	}); err != nil {
		t.Fatal(err)
	}

1006 1007 1008
	log = wiretap.getLog()

	if l := len(log); l != 3 {
Tomasz Zdybał's avatar
Tomasz Zdybał committed
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
		t.Fatal("expected 3 items logged via WireTap, found", l)
	}

	for _, inst := range instances {
		err := inst.Exchange.Close()
		if err != nil {
			t.Fatal(err)
		}
	}
}