session_test.go 16.2 KB
Newer Older
hannahhoward's avatar
hannahhoward committed
1 2 3 4
package session

import (
	"context"
Dirk McCormick's avatar
Dirk McCormick committed
5
	"sync"
hannahhoward's avatar
hannahhoward committed
6 7 8
	"testing"
	"time"

9 10 11 12
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	notifications "github.com/ipfs/go-bitswap/internal/notifications"
	bspm "github.com/ipfs/go-bitswap/internal/peermanager"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
13
	bsspm "github.com/ipfs/go-bitswap/internal/sessionpeermanager"
14
	"github.com/ipfs/go-bitswap/internal/testutil"
hannahhoward's avatar
hannahhoward committed
15 16
	cid "github.com/ipfs/go-cid"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
17
	delay "github.com/ipfs/go-ipfs-delay"
Raúl Kripalani's avatar
Raúl Kripalani committed
18
	peer "github.com/libp2p/go-libp2p-core/peer"
hannahhoward's avatar
hannahhoward committed
19 20
)

21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
type mockSessionMgr struct {
	lk            sync.Mutex
	removeSession bool
	cancels       []cid.Cid
}

func newMockSessionMgr() *mockSessionMgr {
	return &mockSessionMgr{}
}

func (msm *mockSessionMgr) removeSessionCalled() bool {
	msm.lk.Lock()
	defer msm.lk.Unlock()
	return msm.removeSession
}

func (msm *mockSessionMgr) cancelled() []cid.Cid {
	msm.lk.Lock()
	defer msm.lk.Unlock()
	return msm.cancels
}

func (msm *mockSessionMgr) RemoveSession(sesid uint64) {
	msm.lk.Lock()
	defer msm.lk.Unlock()
	msm.removeSession = true
}

func (msm *mockSessionMgr) CancelSessionWants(sid uint64, wants []cid.Cid) {
	msm.lk.Lock()
	defer msm.lk.Unlock()
	msm.cancels = append(msm.cancels, wants...)
}

55 56
func newFakeSessionPeerManager() *bsspm.SessionPeerManager {
	return bsspm.New(1, newFakePeerTagger())
hannahhoward's avatar
hannahhoward committed
57 58
}

59 60 61 62
func newFakePeerTagger() *fakePeerTagger {
	return &fakePeerTagger{
		protectedPeers: make(map[peer.ID]map[string]struct{}),
	}
dirkmc's avatar
dirkmc committed
63 64
}

65 66 67
type fakePeerTagger struct {
	lk             sync.Mutex
	protectedPeers map[peer.ID]map[string]struct{}
hannahhoward's avatar
hannahhoward committed
68 69
}

70 71 72 73 74 75 76 77 78 79 80 81 82
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, val int) {}
func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string)        {}

func (fpt *fakePeerTagger) Protect(p peer.ID, tag string) {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()

	tags, ok := fpt.protectedPeers[p]
	if !ok {
		tags = make(map[string]struct{})
		fpt.protectedPeers[p] = tags
	}
	tags[tag] = struct{}{}
83
}
84 85 86 87 88 89 90 91 92 93 94

func (fpt *fakePeerTagger) Unprotect(p peer.ID, tag string) bool {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()

	if tags, ok := fpt.protectedPeers[p]; ok {
		delete(tags, tag)
		return len(tags) > 0
	}

	return false
hannahhoward's avatar
hannahhoward committed
95 96
}

97
func (fpt *fakePeerTagger) isProtected(p peer.ID) bool {
Dirk McCormick's avatar
Dirk McCormick committed
98 99 100
	fpt.lk.Lock()
	defer fpt.lk.Unlock()

101
	return len(fpt.protectedPeers[p]) > 0
Dirk McCormick's avatar
Dirk McCormick committed
102 103
}

104 105 106 107 108 109 110
type fakeProviderFinder struct {
	findMorePeersRequested chan cid.Cid
}

func newFakeProviderFinder() *fakeProviderFinder {
	return &fakeProviderFinder{
		findMorePeersRequested: make(chan cid.Cid, 1),
dirkmc's avatar
dirkmc committed
111 112
	}
}
113 114 115 116 117 118 119 120 121 122

func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID {
	go func() {
		select {
		case fpf.findMorePeersRequested <- k:
		case <-ctx.Done():
		}
	}()

	return make(chan peer.ID)
hannahhoward's avatar
hannahhoward committed
123 124
}

Dirk McCormick's avatar
Dirk McCormick committed
125 126 127 128
type wantReq struct {
	cids []cid.Cid
}

dirkmc's avatar
dirkmc committed
129
type fakePeerManager struct {
Dirk McCormick's avatar
Dirk McCormick committed
130
	wantReqs chan wantReq
131 132
}

dirkmc's avatar
dirkmc committed
133
func newFakePeerManager() *fakePeerManager {
Dirk McCormick's avatar
Dirk McCormick committed
134 135 136
	return &fakePeerManager{
		wantReqs: make(chan wantReq, 1),
	}
137 138
}

dirkmc's avatar
dirkmc committed
139 140 141 142 143
func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool {
	return true
}
func (pm *fakePeerManager) UnregisterSession(uint64)                                 {}
func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}
Dirk McCormick's avatar
Dirk McCormick committed
144 145 146 147 148 149
func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Cid) {
	select {
	case pm.wantReqs <- wantReq{cids}:
	case <-ctx.Done():
	}
}
150
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {}
151

hannahhoward's avatar
hannahhoward committed
152
func TestSessionGetBlocks(t *testing.T) {
153
	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Dirk McCormick's avatar
Dirk McCormick committed
154 155
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
156
	fpf := newFakeProviderFinder()
dirkmc's avatar
dirkmc committed
157 158
	sim := bssim.New()
	bpm := bsbpm.New()
159 160
	notif := notifications.New()
	defer notif.Shutdown()
hannahhoward's avatar
hannahhoward committed
161
	id := testutil.GenerateSessionID()
162 163
	sm := newMockSessionMgr()
	session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
hannahhoward's avatar
hannahhoward committed
164
	blockGenerator := blocksutil.NewBlockGenerator()
165
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
166 167 168 169
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
170

dirkmc's avatar
dirkmc committed
171
	_, err := session.GetBlocks(ctx, cids)
172

hannahhoward's avatar
hannahhoward committed
173 174 175 176
	if err != nil {
		t.Fatal("error getting blocks")
	}

dirkmc's avatar
dirkmc committed
177
	// Wait for initial want request
Dirk McCormick's avatar
Dirk McCormick committed
178
	receivedWantReq := <-fpm.wantReqs
179

dirkmc's avatar
dirkmc committed
180 181 182 183 184 185 186
	// Should have registered session's interest in blocks
	intSes := sim.FilterSessionInterested(id, cids)
	if !testutil.MatchKeysIgnoreOrder(intSes[0], cids) {
		t.Fatal("did not register session interest in blocks")
	}

	// Should have sent out broadcast request for wants
187
	if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
188 189 190
		t.Fatal("did not enqueue correct initial number of wants")
	}

dirkmc's avatar
dirkmc committed
191
	// Simulate receiving HAVEs from several peers
192
	peers := testutil.GeneratePeers(5)
hannahhoward's avatar
hannahhoward committed
193
	for i, p := range peers {
194
		blk := blks[testutil.IndexOf(blks, receivedWantReq.cids[i])]
dirkmc's avatar
dirkmc committed
195
		session.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{blk.Cid()}, []cid.Cid{})
hannahhoward's avatar
hannahhoward committed
196 197
	}

198 199
	time.Sleep(10 * time.Millisecond)

dirkmc's avatar
dirkmc committed
200
	// Verify new peers were recorded
Dirk McCormick's avatar
Dirk McCormick committed
201
	if !testutil.MatchPeersIgnoreOrder(fspm.Peers(), peers) {
dirkmc's avatar
dirkmc committed
202
		t.Fatal("peers not recorded by the peer manager")
hannahhoward's avatar
hannahhoward committed
203 204
	}

dirkmc's avatar
dirkmc committed
205 206 207 208
	// Verify session still wants received blocks
	_, unwanted := sim.SplitWantedUnwanted(blks)
	if len(unwanted) > 0 {
		t.Fatal("all blocks should still be wanted")
hannahhoward's avatar
hannahhoward committed
209 210
	}

dirkmc's avatar
dirkmc committed
211 212
	// Simulate receiving DONT_HAVE for a CID
	session.ReceiveFrom(peers[0], []cid.Cid{}, []cid.Cid{}, []cid.Cid{blks[0].Cid()})
213

214 215
	time.Sleep(10 * time.Millisecond)

dirkmc's avatar
dirkmc committed
216 217 218 219
	// Verify session still wants received blocks
	_, unwanted = sim.SplitWantedUnwanted(blks)
	if len(unwanted) > 0 {
		t.Fatal("all blocks should still be wanted")
hannahhoward's avatar
hannahhoward committed
220 221
	}

dirkmc's avatar
dirkmc committed
222 223 224
	// Simulate receiving block for a CID
	session.ReceiveFrom(peers[1], []cid.Cid{blks[0].Cid()}, []cid.Cid{}, []cid.Cid{})

Dirk McCormick's avatar
Dirk McCormick committed
225
	time.Sleep(10 * time.Millisecond)
226

dirkmc's avatar
dirkmc committed
227 228 229 230
	// Verify session no longer wants received block
	wanted, unwanted := sim.SplitWantedUnwanted(blks)
	if len(unwanted) != 1 || !unwanted[0].Cid().Equals(blks[0].Cid()) {
		t.Fatal("session wants block that has already been received")
hannahhoward's avatar
hannahhoward committed
231
	}
dirkmc's avatar
dirkmc committed
232 233
	if len(wanted) != len(blks)-1 {
		t.Fatal("session wants incorrect number of blocks")
234
	}
Dirk McCormick's avatar
Dirk McCormick committed
235 236 237 238 239 240

	// Shut down session
	cancel()

	time.Sleep(10 * time.Millisecond)

241 242 243
	// Verify session was removed
	if !sm.removeSessionCalled() {
		t.Fatal("expected session to be removed")
Dirk McCormick's avatar
Dirk McCormick committed
244
	}
hannahhoward's avatar
hannahhoward committed
245 246 247
}

func TestSessionFindMorePeers(t *testing.T) {
248
	ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond)
hannahhoward's avatar
hannahhoward committed
249
	defer cancel()
Dirk McCormick's avatar
Dirk McCormick committed
250 251
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
252
	fpf := newFakeProviderFinder()
dirkmc's avatar
dirkmc committed
253 254
	sim := bssim.New()
	bpm := bsbpm.New()
255 256
	notif := notifications.New()
	defer notif.Shutdown()
hannahhoward's avatar
hannahhoward committed
257
	id := testutil.GenerateSessionID()
258 259
	sm := newMockSessionMgr()
	session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
260
	session.SetBaseTickDelay(200 * time.Microsecond)
hannahhoward's avatar
hannahhoward committed
261
	blockGenerator := blocksutil.NewBlockGenerator()
262
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
263 264 265 266
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
dirkmc's avatar
dirkmc committed
267
	_, err := session.GetBlocks(ctx, cids)
hannahhoward's avatar
hannahhoward committed
268 269 270 271
	if err != nil {
		t.Fatal("error getting blocks")
	}

dirkmc's avatar
dirkmc committed
272
	// The session should initially broadcast want-haves
273
	select {
Dirk McCormick's avatar
Dirk McCormick committed
274
	case <-fpm.wantReqs:
275 276 277
	case <-ctx.Done():
		t.Fatal("Did not make first want request ")
	}
278

hannahhoward's avatar
hannahhoward committed
279
	// receive a block to trigger a tick reset
280 281 282
	time.Sleep(20 * time.Millisecond) // need to make sure some latency registers
	// or there will be no tick set -- time precision on Windows in go is in the
	// millisecond range
hannahhoward's avatar
hannahhoward committed
283
	p := testutil.GeneratePeers(1)[0]
284 285

	blk := blks[0]
dirkmc's avatar
dirkmc committed
286 287 288 289
	session.ReceiveFrom(p, []cid.Cid{blk.Cid()}, []cid.Cid{}, []cid.Cid{})

	// The session should now time out waiting for a response and broadcast
	// want-haves again
290
	select {
Dirk McCormick's avatar
Dirk McCormick committed
291
	case <-fpm.wantReqs:
292 293 294
	case <-ctx.Done():
		t.Fatal("Did not make second want request ")
	}
hannahhoward's avatar
hannahhoward committed
295

296
	// The session should keep broadcasting periodically until it receives a response
297
	select {
Dirk McCormick's avatar
Dirk McCormick committed
298
	case receivedWantReq := <-fpm.wantReqs:
299
		if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
300 301
			t.Fatal("did not rebroadcast whole live list")
		}
302 303 304 305 306 307 308
		// Make sure the first block is not included because it has already
		// been received
		for _, c := range receivedWantReq.cids {
			if c.Equals(cids[0]) {
				t.Fatal("should not braodcast block that was already received")
			}
		}
309 310
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
hannahhoward's avatar
hannahhoward committed
311
	}
312

dirkmc's avatar
dirkmc committed
313
	// The session should eventually try to find more peers
314
	select {
315
	case <-fpf.findMorePeersRequested:
316 317
	case <-ctx.Done():
		t.Fatal("Did not find more peers")
hannahhoward's avatar
hannahhoward committed
318 319
	}
}
320

321 322 323
func TestSessionOnPeersExhausted(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()
Dirk McCormick's avatar
Dirk McCormick committed
324 325
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
326 327
	fpf := newFakeProviderFinder()

328 329 330 331 332
	sim := bssim.New()
	bpm := bsbpm.New()
	notif := notifications.New()
	defer notif.Shutdown()
	id := testutil.GenerateSessionID()
333 334
	sm := newMockSessionMgr()
	session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
335 336 337 338 339 340 341 342 343 344 345 346 347
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5)
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	_, err := session.GetBlocks(ctx, cids)

	if err != nil {
		t.Fatal("error getting blocks")
	}

	// Wait for initial want request
Dirk McCormick's avatar
Dirk McCormick committed
348
	receivedWantReq := <-fpm.wantReqs
349 350 351 352 353 354 355 356 357 358

	// Should have sent out broadcast request for wants
	if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
		t.Fatal("did not enqueue correct initial number of wants")
	}

	// Signal that all peers have send DONT_HAVE for two of the wants
	session.onPeersExhausted(cids[len(cids)-2:])

	// Wait for want request
Dirk McCormick's avatar
Dirk McCormick committed
359
	receivedWantReq = <-fpm.wantReqs
360 361 362 363 364 365 366

	// Should have sent out broadcast request for wants
	if len(receivedWantReq.cids) != 2 {
		t.Fatal("did not enqueue correct initial number of wants")
	}
}

367
func TestSessionFailingToGetFirstBlock(t *testing.T) {
368
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
369
	defer cancel()
Dirk McCormick's avatar
Dirk McCormick committed
370 371
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
372
	fpf := newFakeProviderFinder()
dirkmc's avatar
dirkmc committed
373 374
	sim := bssim.New()
	bpm := bsbpm.New()
375 376
	notif := notifications.New()
	defer notif.Shutdown()
377
	id := testutil.GenerateSessionID()
378 379
	sm := newMockSessionMgr()
	session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
380 381 382 383 384 385 386 387 388 389 390 391
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(4)
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	startTick := time.Now()
	_, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

dirkmc's avatar
dirkmc committed
392
	// The session should initially broadcast want-haves
393
	select {
Dirk McCormick's avatar
Dirk McCormick committed
394
	case <-fpm.wantReqs:
395 396 397 398
	case <-ctx.Done():
		t.Fatal("Did not make first want request ")
	}

dirkmc's avatar
dirkmc committed
399
	// Verify a broadcast was made
400
	select {
Dirk McCormick's avatar
Dirk McCormick committed
401
	case receivedWantReq := <-fpm.wantReqs:
402 403 404 405 406 407 408
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}

dirkmc's avatar
dirkmc committed
409
	// Wait for a request to find more peers to occur
410
	select {
411
	case k := <-fpf.findMorePeersRequested:
412 413 414 415 416 417 418 419
		if testutil.IndexOf(blks, k) == -1 {
			t.Fatal("did not rebroadcast an active want")
		}
	case <-ctx.Done():
		t.Fatal("Did not find more peers")
	}
	firstTickLength := time.Since(startTick)

dirkmc's avatar
dirkmc committed
420
	// Wait for another broadcast to occur
421
	select {
Dirk McCormick's avatar
Dirk McCormick committed
422
	case receivedWantReq := <-fpm.wantReqs:
423 424 425 426 427 428
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
dirkmc's avatar
dirkmc committed
429 430

	// Wait for another broadcast to occur
431 432
	startTick = time.Now()
	select {
Dirk McCormick's avatar
Dirk McCormick committed
433
	case receivedWantReq := <-fpm.wantReqs:
434 435 436 437 438 439
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
dirkmc's avatar
dirkmc committed
440 441

	// Tick should take longer
442 443 444 445
	consecutiveTickLength := time.Since(startTick)
	if firstTickLength > consecutiveTickLength {
		t.Fatal("Should have increased tick length after first consecutive tick")
	}
dirkmc's avatar
dirkmc committed
446 447

	// Wait for another broadcast to occur
448 449
	startTick = time.Now()
	select {
Dirk McCormick's avatar
Dirk McCormick committed
450
	case receivedWantReq := <-fpm.wantReqs:
451 452 453 454 455 456
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
dirkmc's avatar
dirkmc committed
457 458

	// Tick should take longer
459 460 461 462 463
	secondConsecutiveTickLength := time.Since(startTick)
	if consecutiveTickLength > secondConsecutiveTickLength {
		t.Fatal("Should have increased tick length after first consecutive tick")
	}

dirkmc's avatar
dirkmc committed
464
	// Should not have tried to find peers on consecutive ticks
465
	select {
466
	case <-fpf.findMorePeersRequested:
dirkmc's avatar
dirkmc committed
467
		t.Fatal("Should not have tried to find peers on consecutive ticks")
468 469 470
	default:
	}

dirkmc's avatar
dirkmc committed
471
	// Wait for rebroadcast to occur
472
	select {
473
	case k := <-fpf.findMorePeersRequested:
474 475 476 477 478 479 480
		if testutil.IndexOf(blks, k) == -1 {
			t.Fatal("did not rebroadcast an active want")
		}
	case <-ctx.Done():
		t.Fatal("Did not rebroadcast to find more peers")
	}
}
481 482

func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
Dirk McCormick's avatar
Dirk McCormick committed
483 484
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
485
	fpf := newFakeProviderFinder()
dirkmc's avatar
dirkmc committed
486 487
	sim := bssim.New()
	bpm := bsbpm.New()
488 489 490
	notif := notifications.New()
	defer notif.Shutdown()
	id := testutil.GenerateSessionID()
491
	sm := newMockSessionMgr()
492 493 494

	// Create a new session with its own context
	sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
495
	session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522

	timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer timerCancel()

	// Request a block with a new context
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(1)
	getctx, getcancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
	defer getcancel()

	getBlocksCh, err := session.GetBlocks(getctx, []cid.Cid{blks[0].Cid()})
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// Cancel the session context
	sesscancel()

	// Expect the GetBlocks() channel to be closed
	select {
	case _, ok := <-getBlocksCh:
		if ok {
			t.Fatal("expected channel to be closed but was not closed")
		}
	case <-timerCtx.Done():
		t.Fatal("expected channel to be closed before timeout")
	}
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556

	time.Sleep(10 * time.Millisecond)

	// Expect RemoveSession to be called
	if !sm.removeSessionCalled() {
		t.Fatal("expected onShutdown to be called")
	}
}

func TestSessionOnShutdownCalled(t *testing.T) {
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
	fpf := newFakeProviderFinder()
	sim := bssim.New()
	bpm := bsbpm.New()
	notif := notifications.New()
	defer notif.Shutdown()
	id := testutil.GenerateSessionID()
	sm := newMockSessionMgr()

	// Create a new session with its own context
	sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
	defer sesscancel()
	session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")

	// Shutdown the session
	session.Shutdown()

	time.Sleep(10 * time.Millisecond)

	// Expect RemoveSession to be called
	if !sm.removeSessionCalled() {
		t.Fatal("expected onShutdown to be called")
	}
557
}
dirkmc's avatar
dirkmc committed
558

559 560
func TestSessionReceiveMessageAfterCtxCancel(t *testing.T) {
	ctx, cancelCtx := context.WithTimeout(context.Background(), 20*time.Millisecond)
Dirk McCormick's avatar
Dirk McCormick committed
561 562
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
563 564
	fpf := newFakeProviderFinder()

dirkmc's avatar
dirkmc committed
565 566 567 568 569
	sim := bssim.New()
	bpm := bsbpm.New()
	notif := notifications.New()
	defer notif.Shutdown()
	id := testutil.GenerateSessionID()
570 571
	sm := newMockSessionMgr()
	session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
dirkmc's avatar
dirkmc committed
572 573 574 575 576 577 578 579 580 581
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(2)
	cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}

	_, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// Wait for initial want request
Dirk McCormick's avatar
Dirk McCormick committed
582
	<-fpm.wantReqs
dirkmc's avatar
dirkmc committed
583 584 585 586 587 588 589 590 591 592 593 594

	// Shut down session
	cancelCtx()

	// Simulate receiving block for a CID
	peer := testutil.GeneratePeers(1)[0]
	session.ReceiveFrom(peer, []cid.Cid{blks[0].Cid()}, []cid.Cid{}, []cid.Cid{})

	time.Sleep(5 * time.Millisecond)

	// If we don't get a panic then the test is considered passing
}