session_test.go 16.2 KB
Newer Older
hannahhoward's avatar
hannahhoward committed
1 2 3 4
package session

import (
	"context"
Dirk McCormick's avatar
Dirk McCormick committed
5
	"sync"
hannahhoward's avatar
hannahhoward committed
6 7 8
	"testing"
	"time"

9 10 11 12
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	notifications "github.com/ipfs/go-bitswap/internal/notifications"
	bspm "github.com/ipfs/go-bitswap/internal/peermanager"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
13
	bsspm "github.com/ipfs/go-bitswap/internal/sessionpeermanager"
14
	"github.com/ipfs/go-bitswap/internal/testutil"
hannahhoward's avatar
hannahhoward committed
15 16
	cid "github.com/ipfs/go-cid"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
17
	delay "github.com/ipfs/go-ipfs-delay"
Raúl Kripalani's avatar
Raúl Kripalani committed
18
	peer "github.com/libp2p/go-libp2p-core/peer"
hannahhoward's avatar
hannahhoward committed
19 20
)

21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
type mockSessionMgr struct {
	lk            sync.Mutex
	removeSession bool
	cancels       []cid.Cid
}

func newMockSessionMgr() *mockSessionMgr {
	return &mockSessionMgr{}
}

func (msm *mockSessionMgr) removeSessionCalled() bool {
	msm.lk.Lock()
	defer msm.lk.Unlock()
	return msm.removeSession
}

func (msm *mockSessionMgr) cancelled() []cid.Cid {
	msm.lk.Lock()
	defer msm.lk.Unlock()
	return msm.cancels
}

func (msm *mockSessionMgr) RemoveSession(sesid uint64) {
	msm.lk.Lock()
	defer msm.lk.Unlock()
	msm.removeSession = true
}

func (msm *mockSessionMgr) CancelSessionWants(sid uint64, wants []cid.Cid) {
	msm.lk.Lock()
	defer msm.lk.Unlock()
	msm.cancels = append(msm.cancels, wants...)
}

55 56
func newFakeSessionPeerManager() *bsspm.SessionPeerManager {
	return bsspm.New(1, newFakePeerTagger())
hannahhoward's avatar
hannahhoward committed
57 58
}

59 60 61 62
func newFakePeerTagger() *fakePeerTagger {
	return &fakePeerTagger{
		protectedPeers: make(map[peer.ID]map[string]struct{}),
	}
dirkmc's avatar
dirkmc committed
63 64
}

65 66 67
type fakePeerTagger struct {
	lk             sync.Mutex
	protectedPeers map[peer.ID]map[string]struct{}
hannahhoward's avatar
hannahhoward committed
68 69
}

70 71 72 73 74 75 76 77 78 79 80 81 82
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, val int) {}
func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string)        {}

func (fpt *fakePeerTagger) Protect(p peer.ID, tag string) {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()

	tags, ok := fpt.protectedPeers[p]
	if !ok {
		tags = make(map[string]struct{})
		fpt.protectedPeers[p] = tags
	}
	tags[tag] = struct{}{}
83
}
84 85 86 87 88 89 90 91 92 93 94

func (fpt *fakePeerTagger) Unprotect(p peer.ID, tag string) bool {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()

	if tags, ok := fpt.protectedPeers[p]; ok {
		delete(tags, tag)
		return len(tags) > 0
	}

	return false
hannahhoward's avatar
hannahhoward committed
95 96
}

97
func (fpt *fakePeerTagger) isProtected(p peer.ID) bool {
Dirk McCormick's avatar
Dirk McCormick committed
98 99 100
	fpt.lk.Lock()
	defer fpt.lk.Unlock()

101
	return len(fpt.protectedPeers[p]) > 0
Dirk McCormick's avatar
Dirk McCormick committed
102 103
}

104 105 106 107 108 109 110
type fakeProviderFinder struct {
	findMorePeersRequested chan cid.Cid
}

func newFakeProviderFinder() *fakeProviderFinder {
	return &fakeProviderFinder{
		findMorePeersRequested: make(chan cid.Cid, 1),
dirkmc's avatar
dirkmc committed
111 112
	}
}
113 114 115 116 117 118 119 120 121 122

func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID {
	go func() {
		select {
		case fpf.findMorePeersRequested <- k:
		case <-ctx.Done():
		}
	}()

	return make(chan peer.ID)
hannahhoward's avatar
hannahhoward committed
123 124
}

Dirk McCormick's avatar
Dirk McCormick committed
125 126 127 128
type wantReq struct {
	cids []cid.Cid
}

dirkmc's avatar
dirkmc committed
129
type fakePeerManager struct {
Dirk McCormick's avatar
Dirk McCormick committed
130
	wantReqs chan wantReq
131 132
}

dirkmc's avatar
dirkmc committed
133
func newFakePeerManager() *fakePeerManager {
Dirk McCormick's avatar
Dirk McCormick committed
134 135 136
	return &fakePeerManager{
		wantReqs: make(chan wantReq, 1),
	}
137 138
}

139
func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session)                    {}
dirkmc's avatar
dirkmc committed
140 141
func (pm *fakePeerManager) UnregisterSession(uint64)                                 {}
func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}
Dirk McCormick's avatar
Dirk McCormick committed
142 143 144 145 146 147
func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Cid) {
	select {
	case pm.wantReqs <- wantReq{cids}:
	case <-ctx.Done():
	}
}
148
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {}
149

hannahhoward's avatar
hannahhoward committed
150
func TestSessionGetBlocks(t *testing.T) {
151
	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Dirk McCormick's avatar
Dirk McCormick committed
152 153
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
154
	fpf := newFakeProviderFinder()
dirkmc's avatar
dirkmc committed
155 156
	sim := bssim.New()
	bpm := bsbpm.New()
157 158
	notif := notifications.New()
	defer notif.Shutdown()
hannahhoward's avatar
hannahhoward committed
159
	id := testutil.GenerateSessionID()
160 161
	sm := newMockSessionMgr()
	session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
hannahhoward's avatar
hannahhoward committed
162
	blockGenerator := blocksutil.NewBlockGenerator()
163
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
164 165 166 167
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
168

dirkmc's avatar
dirkmc committed
169
	_, err := session.GetBlocks(ctx, cids)
170

hannahhoward's avatar
hannahhoward committed
171 172 173 174
	if err != nil {
		t.Fatal("error getting blocks")
	}

dirkmc's avatar
dirkmc committed
175
	// Wait for initial want request
Dirk McCormick's avatar
Dirk McCormick committed
176
	receivedWantReq := <-fpm.wantReqs
177

dirkmc's avatar
dirkmc committed
178 179 180 181 182 183 184
	// Should have registered session's interest in blocks
	intSes := sim.FilterSessionInterested(id, cids)
	if !testutil.MatchKeysIgnoreOrder(intSes[0], cids) {
		t.Fatal("did not register session interest in blocks")
	}

	// Should have sent out broadcast request for wants
185
	if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
186 187 188
		t.Fatal("did not enqueue correct initial number of wants")
	}

dirkmc's avatar
dirkmc committed
189
	// Simulate receiving HAVEs from several peers
190
	peers := testutil.GeneratePeers(5)
hannahhoward's avatar
hannahhoward committed
191
	for i, p := range peers {
192
		blk := blks[testutil.IndexOf(blks, receivedWantReq.cids[i])]
dirkmc's avatar
dirkmc committed
193
		session.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{blk.Cid()}, []cid.Cid{})
hannahhoward's avatar
hannahhoward committed
194 195
	}

196 197
	time.Sleep(10 * time.Millisecond)

dirkmc's avatar
dirkmc committed
198
	// Verify new peers were recorded
Dirk McCormick's avatar
Dirk McCormick committed
199
	if !testutil.MatchPeersIgnoreOrder(fspm.Peers(), peers) {
dirkmc's avatar
dirkmc committed
200
		t.Fatal("peers not recorded by the peer manager")
hannahhoward's avatar
hannahhoward committed
201 202
	}

dirkmc's avatar
dirkmc committed
203 204 205 206
	// Verify session still wants received blocks
	_, unwanted := sim.SplitWantedUnwanted(blks)
	if len(unwanted) > 0 {
		t.Fatal("all blocks should still be wanted")
hannahhoward's avatar
hannahhoward committed
207 208
	}

dirkmc's avatar
dirkmc committed
209 210
	// Simulate receiving DONT_HAVE for a CID
	session.ReceiveFrom(peers[0], []cid.Cid{}, []cid.Cid{}, []cid.Cid{blks[0].Cid()})
211

212 213
	time.Sleep(10 * time.Millisecond)

dirkmc's avatar
dirkmc committed
214 215 216 217
	// Verify session still wants received blocks
	_, unwanted = sim.SplitWantedUnwanted(blks)
	if len(unwanted) > 0 {
		t.Fatal("all blocks should still be wanted")
hannahhoward's avatar
hannahhoward committed
218 219
	}

dirkmc's avatar
dirkmc committed
220 221 222
	// Simulate receiving block for a CID
	session.ReceiveFrom(peers[1], []cid.Cid{blks[0].Cid()}, []cid.Cid{}, []cid.Cid{})

Dirk McCormick's avatar
Dirk McCormick committed
223
	time.Sleep(10 * time.Millisecond)
224

dirkmc's avatar
dirkmc committed
225 226 227 228
	// Verify session no longer wants received block
	wanted, unwanted := sim.SplitWantedUnwanted(blks)
	if len(unwanted) != 1 || !unwanted[0].Cid().Equals(blks[0].Cid()) {
		t.Fatal("session wants block that has already been received")
hannahhoward's avatar
hannahhoward committed
229
	}
dirkmc's avatar
dirkmc committed
230 231
	if len(wanted) != len(blks)-1 {
		t.Fatal("session wants incorrect number of blocks")
232
	}
Dirk McCormick's avatar
Dirk McCormick committed
233 234 235 236 237 238

	// Shut down session
	cancel()

	time.Sleep(10 * time.Millisecond)

239 240 241
	// Verify session was removed
	if !sm.removeSessionCalled() {
		t.Fatal("expected session to be removed")
Dirk McCormick's avatar
Dirk McCormick committed
242
	}
hannahhoward's avatar
hannahhoward committed
243 244 245
}

func TestSessionFindMorePeers(t *testing.T) {
246
	ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond)
hannahhoward's avatar
hannahhoward committed
247
	defer cancel()
Dirk McCormick's avatar
Dirk McCormick committed
248 249
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
250
	fpf := newFakeProviderFinder()
dirkmc's avatar
dirkmc committed
251 252
	sim := bssim.New()
	bpm := bsbpm.New()
253 254
	notif := notifications.New()
	defer notif.Shutdown()
hannahhoward's avatar
hannahhoward committed
255
	id := testutil.GenerateSessionID()
256 257
	sm := newMockSessionMgr()
	session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
258
	session.SetBaseTickDelay(200 * time.Microsecond)
hannahhoward's avatar
hannahhoward committed
259
	blockGenerator := blocksutil.NewBlockGenerator()
260
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
261 262 263 264
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
dirkmc's avatar
dirkmc committed
265
	_, err := session.GetBlocks(ctx, cids)
hannahhoward's avatar
hannahhoward committed
266 267 268 269
	if err != nil {
		t.Fatal("error getting blocks")
	}

dirkmc's avatar
dirkmc committed
270
	// The session should initially broadcast want-haves
271
	select {
Dirk McCormick's avatar
Dirk McCormick committed
272
	case <-fpm.wantReqs:
273 274 275
	case <-ctx.Done():
		t.Fatal("Did not make first want request ")
	}
276

hannahhoward's avatar
hannahhoward committed
277
	// receive a block to trigger a tick reset
278 279 280
	time.Sleep(20 * time.Millisecond) // need to make sure some latency registers
	// or there will be no tick set -- time precision on Windows in go is in the
	// millisecond range
hannahhoward's avatar
hannahhoward committed
281
	p := testutil.GeneratePeers(1)[0]
282 283

	blk := blks[0]
dirkmc's avatar
dirkmc committed
284 285 286 287
	session.ReceiveFrom(p, []cid.Cid{blk.Cid()}, []cid.Cid{}, []cid.Cid{})

	// The session should now time out waiting for a response and broadcast
	// want-haves again
288
	select {
Dirk McCormick's avatar
Dirk McCormick committed
289
	case <-fpm.wantReqs:
290 291 292
	case <-ctx.Done():
		t.Fatal("Did not make second want request ")
	}
hannahhoward's avatar
hannahhoward committed
293

294
	// The session should keep broadcasting periodically until it receives a response
295
	select {
Dirk McCormick's avatar
Dirk McCormick committed
296
	case receivedWantReq := <-fpm.wantReqs:
297
		if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
298 299
			t.Fatal("did not rebroadcast whole live list")
		}
300 301 302 303 304 305 306
		// Make sure the first block is not included because it has already
		// been received
		for _, c := range receivedWantReq.cids {
			if c.Equals(cids[0]) {
				t.Fatal("should not braodcast block that was already received")
			}
		}
307 308
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
hannahhoward's avatar
hannahhoward committed
309
	}
310

dirkmc's avatar
dirkmc committed
311
	// The session should eventually try to find more peers
312
	select {
313
	case <-fpf.findMorePeersRequested:
314 315
	case <-ctx.Done():
		t.Fatal("Did not find more peers")
hannahhoward's avatar
hannahhoward committed
316 317
	}
}
318

319 320 321
func TestSessionOnPeersExhausted(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()
Dirk McCormick's avatar
Dirk McCormick committed
322 323
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
324 325
	fpf := newFakeProviderFinder()

326 327 328 329 330
	sim := bssim.New()
	bpm := bsbpm.New()
	notif := notifications.New()
	defer notif.Shutdown()
	id := testutil.GenerateSessionID()
331 332
	sm := newMockSessionMgr()
	session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
333 334 335 336 337 338 339 340 341 342 343 344 345
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5)
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	_, err := session.GetBlocks(ctx, cids)

	if err != nil {
		t.Fatal("error getting blocks")
	}

	// Wait for initial want request
Dirk McCormick's avatar
Dirk McCormick committed
346
	receivedWantReq := <-fpm.wantReqs
347 348 349 350 351 352 353 354 355 356

	// Should have sent out broadcast request for wants
	if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
		t.Fatal("did not enqueue correct initial number of wants")
	}

	// Signal that all peers have send DONT_HAVE for two of the wants
	session.onPeersExhausted(cids[len(cids)-2:])

	// Wait for want request
Dirk McCormick's avatar
Dirk McCormick committed
357
	receivedWantReq = <-fpm.wantReqs
358 359 360 361 362 363 364

	// Should have sent out broadcast request for wants
	if len(receivedWantReq.cids) != 2 {
		t.Fatal("did not enqueue correct initial number of wants")
	}
}

365
func TestSessionFailingToGetFirstBlock(t *testing.T) {
366
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
367
	defer cancel()
Dirk McCormick's avatar
Dirk McCormick committed
368 369
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
370
	fpf := newFakeProviderFinder()
dirkmc's avatar
dirkmc committed
371 372
	sim := bssim.New()
	bpm := bsbpm.New()
373 374
	notif := notifications.New()
	defer notif.Shutdown()
375
	id := testutil.GenerateSessionID()
376 377
	sm := newMockSessionMgr()
	session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
378 379 380 381 382 383 384 385 386 387 388 389
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(4)
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	startTick := time.Now()
	_, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

dirkmc's avatar
dirkmc committed
390
	// The session should initially broadcast want-haves
391
	select {
Dirk McCormick's avatar
Dirk McCormick committed
392
	case <-fpm.wantReqs:
393 394 395 396
	case <-ctx.Done():
		t.Fatal("Did not make first want request ")
	}

dirkmc's avatar
dirkmc committed
397
	// Verify a broadcast was made
398
	select {
Dirk McCormick's avatar
Dirk McCormick committed
399
	case receivedWantReq := <-fpm.wantReqs:
400 401 402 403 404 405 406
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}

dirkmc's avatar
dirkmc committed
407
	// Wait for a request to find more peers to occur
408
	select {
409
	case k := <-fpf.findMorePeersRequested:
410 411 412 413 414 415 416 417
		if testutil.IndexOf(blks, k) == -1 {
			t.Fatal("did not rebroadcast an active want")
		}
	case <-ctx.Done():
		t.Fatal("Did not find more peers")
	}
	firstTickLength := time.Since(startTick)

dirkmc's avatar
dirkmc committed
418
	// Wait for another broadcast to occur
419
	select {
Dirk McCormick's avatar
Dirk McCormick committed
420
	case receivedWantReq := <-fpm.wantReqs:
421 422 423 424 425 426
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
dirkmc's avatar
dirkmc committed
427 428

	// Wait for another broadcast to occur
429 430
	startTick = time.Now()
	select {
Dirk McCormick's avatar
Dirk McCormick committed
431
	case receivedWantReq := <-fpm.wantReqs:
432 433 434 435 436 437
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
dirkmc's avatar
dirkmc committed
438 439

	// Tick should take longer
440 441 442 443
	consecutiveTickLength := time.Since(startTick)
	if firstTickLength > consecutiveTickLength {
		t.Fatal("Should have increased tick length after first consecutive tick")
	}
dirkmc's avatar
dirkmc committed
444 445

	// Wait for another broadcast to occur
446 447
	startTick = time.Now()
	select {
Dirk McCormick's avatar
Dirk McCormick committed
448
	case receivedWantReq := <-fpm.wantReqs:
449 450 451 452 453 454
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
dirkmc's avatar
dirkmc committed
455 456

	// Tick should take longer
457 458 459 460 461
	secondConsecutiveTickLength := time.Since(startTick)
	if consecutiveTickLength > secondConsecutiveTickLength {
		t.Fatal("Should have increased tick length after first consecutive tick")
	}

dirkmc's avatar
dirkmc committed
462
	// Should not have tried to find peers on consecutive ticks
463
	select {
464
	case <-fpf.findMorePeersRequested:
dirkmc's avatar
dirkmc committed
465
		t.Fatal("Should not have tried to find peers on consecutive ticks")
466 467 468
	default:
	}

dirkmc's avatar
dirkmc committed
469
	// Wait for rebroadcast to occur
470
	select {
471
	case k := <-fpf.findMorePeersRequested:
472 473 474 475 476 477 478
		if testutil.IndexOf(blks, k) == -1 {
			t.Fatal("did not rebroadcast an active want")
		}
	case <-ctx.Done():
		t.Fatal("Did not rebroadcast to find more peers")
	}
}
479 480

func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
Dirk McCormick's avatar
Dirk McCormick committed
481 482
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
483
	fpf := newFakeProviderFinder()
dirkmc's avatar
dirkmc committed
484 485
	sim := bssim.New()
	bpm := bsbpm.New()
486 487 488
	notif := notifications.New()
	defer notif.Shutdown()
	id := testutil.GenerateSessionID()
489
	sm := newMockSessionMgr()
490 491 492

	// Create a new session with its own context
	sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
493
	session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520

	timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer timerCancel()

	// Request a block with a new context
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(1)
	getctx, getcancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
	defer getcancel()

	getBlocksCh, err := session.GetBlocks(getctx, []cid.Cid{blks[0].Cid()})
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// Cancel the session context
	sesscancel()

	// Expect the GetBlocks() channel to be closed
	select {
	case _, ok := <-getBlocksCh:
		if ok {
			t.Fatal("expected channel to be closed but was not closed")
		}
	case <-timerCtx.Done():
		t.Fatal("expected channel to be closed before timeout")
	}
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554

	time.Sleep(10 * time.Millisecond)

	// Expect RemoveSession to be called
	if !sm.removeSessionCalled() {
		t.Fatal("expected onShutdown to be called")
	}
}

func TestSessionOnShutdownCalled(t *testing.T) {
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
	fpf := newFakeProviderFinder()
	sim := bssim.New()
	bpm := bsbpm.New()
	notif := notifications.New()
	defer notif.Shutdown()
	id := testutil.GenerateSessionID()
	sm := newMockSessionMgr()

	// Create a new session with its own context
	sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
	defer sesscancel()
	session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")

	// Shutdown the session
	session.Shutdown()

	time.Sleep(10 * time.Millisecond)

	// Expect RemoveSession to be called
	if !sm.removeSessionCalled() {
		t.Fatal("expected onShutdown to be called")
	}
555
}
dirkmc's avatar
dirkmc committed
556

557 558
func TestSessionReceiveMessageAfterCtxCancel(t *testing.T) {
	ctx, cancelCtx := context.WithTimeout(context.Background(), 20*time.Millisecond)
Dirk McCormick's avatar
Dirk McCormick committed
559 560
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
561 562
	fpf := newFakeProviderFinder()

dirkmc's avatar
dirkmc committed
563 564 565 566 567
	sim := bssim.New()
	bpm := bsbpm.New()
	notif := notifications.New()
	defer notif.Shutdown()
	id := testutil.GenerateSessionID()
568 569
	sm := newMockSessionMgr()
	session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
dirkmc's avatar
dirkmc committed
570 571 572 573 574 575 576 577 578 579
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(2)
	cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}

	_, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// Wait for initial want request
Dirk McCormick's avatar
Dirk McCormick committed
580
	<-fpm.wantReqs
dirkmc's avatar
dirkmc committed
581 582 583 584 585 586 587 588 589 590 591 592

	// Shut down session
	cancelCtx()

	// Simulate receiving block for a CID
	peer := testutil.GeneratePeers(1)[0]
	session.ReceiveFrom(peer, []cid.Cid{blks[0].Cid()}, []cid.Cid{}, []cid.Cid{})

	time.Sleep(5 * time.Millisecond)

	// If we don't get a panic then the test is considered passing
}