session_test.go 14 KB
Newer Older
hannahhoward's avatar
hannahhoward committed
1 2 3 4
package session

import (
	"context"
Dirk McCormick's avatar
Dirk McCormick committed
5
	"sync"
hannahhoward's avatar
hannahhoward committed
6 7 8
	"testing"
	"time"

9 10 11 12
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	notifications "github.com/ipfs/go-bitswap/internal/notifications"
	bspm "github.com/ipfs/go-bitswap/internal/peermanager"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
13
	bsspm "github.com/ipfs/go-bitswap/internal/sessionpeermanager"
14
	"github.com/ipfs/go-bitswap/internal/testutil"
hannahhoward's avatar
hannahhoward committed
15 16
	cid "github.com/ipfs/go-cid"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
17
	delay "github.com/ipfs/go-ipfs-delay"
Raúl Kripalani's avatar
Raúl Kripalani committed
18
	peer "github.com/libp2p/go-libp2p-core/peer"
hannahhoward's avatar
hannahhoward committed
19 20
)

21 22
func newFakeSessionPeerManager() *bsspm.SessionPeerManager {
	return bsspm.New(1, newFakePeerTagger())
hannahhoward's avatar
hannahhoward committed
23 24
}

25
type fakePeerTagger struct {
dirkmc's avatar
dirkmc committed
26 27
}

28 29
func newFakePeerTagger() *fakePeerTagger {
	return &fakePeerTagger{}
hannahhoward's avatar
hannahhoward committed
30 31
}

32 33 34
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, val int) {
}
func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
hannahhoward's avatar
hannahhoward committed
35 36
}

37 38 39 40 41 42 43
type fakeProviderFinder struct {
	findMorePeersRequested chan cid.Cid
}

func newFakeProviderFinder() *fakeProviderFinder {
	return &fakeProviderFinder{
		findMorePeersRequested: make(chan cid.Cid, 1),
dirkmc's avatar
dirkmc committed
44 45
	}
}
46 47 48 49 50 51 52 53 54 55

func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID {
	go func() {
		select {
		case fpf.findMorePeersRequested <- k:
		case <-ctx.Done():
		}
	}()

	return make(chan peer.ID)
hannahhoward's avatar
hannahhoward committed
56 57
}

Dirk McCormick's avatar
Dirk McCormick committed
58 59 60 61
type wantReq struct {
	cids []cid.Cid
}

dirkmc's avatar
dirkmc committed
62
type fakePeerManager struct {
Dirk McCormick's avatar
Dirk McCormick committed
63
	wantReqs chan wantReq
Dirk McCormick's avatar
Dirk McCormick committed
64 65
	lk       sync.Mutex
	cancels  []cid.Cid
66 67
}

dirkmc's avatar
dirkmc committed
68
func newFakePeerManager() *fakePeerManager {
Dirk McCormick's avatar
Dirk McCormick committed
69 70 71
	return &fakePeerManager{
		wantReqs: make(chan wantReq, 1),
	}
72 73
}

dirkmc's avatar
dirkmc committed
74 75 76 77 78
func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool {
	return true
}
func (pm *fakePeerManager) UnregisterSession(uint64)                                 {}
func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}
Dirk McCormick's avatar
Dirk McCormick committed
79 80 81 82 83 84 85
func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Cid) {
	select {
	case pm.wantReqs <- wantReq{cids}:
	case <-ctx.Done():
	}
}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
Dirk McCormick's avatar
Dirk McCormick committed
86 87
	pm.lk.Lock()
	defer pm.lk.Unlock()
Dirk McCormick's avatar
Dirk McCormick committed
88 89
	pm.cancels = append(pm.cancels, cancels...)
}
Dirk McCormick's avatar
Dirk McCormick committed
90 91 92 93 94
func (pm *fakePeerManager) allCancels() []cid.Cid {
	pm.lk.Lock()
	defer pm.lk.Unlock()
	return append([]cid.Cid{}, pm.cancels...)
}
95

hannahhoward's avatar
hannahhoward committed
96
func TestSessionGetBlocks(t *testing.T) {
97
	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Dirk McCormick's avatar
Dirk McCormick committed
98 99
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
100
	fpf := newFakeProviderFinder()
dirkmc's avatar
dirkmc committed
101 102
	sim := bssim.New()
	bpm := bsbpm.New()
103 104
	notif := notifications.New()
	defer notif.Shutdown()
hannahhoward's avatar
hannahhoward committed
105
	id := testutil.GenerateSessionID()
106
	session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
hannahhoward's avatar
hannahhoward committed
107
	blockGenerator := blocksutil.NewBlockGenerator()
108
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
109 110 111 112
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
113

dirkmc's avatar
dirkmc committed
114
	_, err := session.GetBlocks(ctx, cids)
115

hannahhoward's avatar
hannahhoward committed
116 117 118 119
	if err != nil {
		t.Fatal("error getting blocks")
	}

dirkmc's avatar
dirkmc committed
120
	// Wait for initial want request
Dirk McCormick's avatar
Dirk McCormick committed
121
	receivedWantReq := <-fpm.wantReqs
122

dirkmc's avatar
dirkmc committed
123 124 125 126 127 128 129
	// Should have registered session's interest in blocks
	intSes := sim.FilterSessionInterested(id, cids)
	if !testutil.MatchKeysIgnoreOrder(intSes[0], cids) {
		t.Fatal("did not register session interest in blocks")
	}

	// Should have sent out broadcast request for wants
130
	if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
131 132 133
		t.Fatal("did not enqueue correct initial number of wants")
	}

dirkmc's avatar
dirkmc committed
134
	// Simulate receiving HAVEs from several peers
135
	peers := testutil.GeneratePeers(5)
hannahhoward's avatar
hannahhoward committed
136
	for i, p := range peers {
137
		blk := blks[testutil.IndexOf(blks, receivedWantReq.cids[i])]
dirkmc's avatar
dirkmc committed
138
		session.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{blk.Cid()}, []cid.Cid{})
hannahhoward's avatar
hannahhoward committed
139 140
	}

141 142
	time.Sleep(10 * time.Millisecond)

dirkmc's avatar
dirkmc committed
143
	// Verify new peers were recorded
Dirk McCormick's avatar
Dirk McCormick committed
144
	if !testutil.MatchPeersIgnoreOrder(fspm.Peers(), peers) {
dirkmc's avatar
dirkmc committed
145
		t.Fatal("peers not recorded by the peer manager")
hannahhoward's avatar
hannahhoward committed
146 147
	}

dirkmc's avatar
dirkmc committed
148 149 150 151
	// Verify session still wants received blocks
	_, unwanted := sim.SplitWantedUnwanted(blks)
	if len(unwanted) > 0 {
		t.Fatal("all blocks should still be wanted")
hannahhoward's avatar
hannahhoward committed
152 153
	}

dirkmc's avatar
dirkmc committed
154 155
	// Simulate receiving DONT_HAVE for a CID
	session.ReceiveFrom(peers[0], []cid.Cid{}, []cid.Cid{}, []cid.Cid{blks[0].Cid()})
156

157 158
	time.Sleep(10 * time.Millisecond)

dirkmc's avatar
dirkmc committed
159 160 161 162
	// Verify session still wants received blocks
	_, unwanted = sim.SplitWantedUnwanted(blks)
	if len(unwanted) > 0 {
		t.Fatal("all blocks should still be wanted")
hannahhoward's avatar
hannahhoward committed
163 164
	}

dirkmc's avatar
dirkmc committed
165 166 167
	// Simulate receiving block for a CID
	session.ReceiveFrom(peers[1], []cid.Cid{blks[0].Cid()}, []cid.Cid{}, []cid.Cid{})

Dirk McCormick's avatar
Dirk McCormick committed
168
	time.Sleep(10 * time.Millisecond)
169

dirkmc's avatar
dirkmc committed
170 171 172 173
	// Verify session no longer wants received block
	wanted, unwanted := sim.SplitWantedUnwanted(blks)
	if len(unwanted) != 1 || !unwanted[0].Cid().Equals(blks[0].Cid()) {
		t.Fatal("session wants block that has already been received")
hannahhoward's avatar
hannahhoward committed
174
	}
dirkmc's avatar
dirkmc committed
175 176
	if len(wanted) != len(blks)-1 {
		t.Fatal("session wants incorrect number of blocks")
177
	}
Dirk McCormick's avatar
Dirk McCormick committed
178 179 180 181 182 183 184

	// Shut down session
	cancel()

	time.Sleep(10 * time.Millisecond)

	// Verify wants were cancelled
Dirk McCormick's avatar
Dirk McCormick committed
185
	if len(fpm.allCancels()) != len(blks) {
Dirk McCormick's avatar
Dirk McCormick committed
186 187
		t.Fatal("expected cancels to be sent for all wants")
	}
hannahhoward's avatar
hannahhoward committed
188 189 190
}

func TestSessionFindMorePeers(t *testing.T) {
191
	ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond)
hannahhoward's avatar
hannahhoward committed
192
	defer cancel()
Dirk McCormick's avatar
Dirk McCormick committed
193 194
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
195
	fpf := newFakeProviderFinder()
dirkmc's avatar
dirkmc committed
196 197
	sim := bssim.New()
	bpm := bsbpm.New()
198 199
	notif := notifications.New()
	defer notif.Shutdown()
hannahhoward's avatar
hannahhoward committed
200
	id := testutil.GenerateSessionID()
201
	session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
202
	session.SetBaseTickDelay(200 * time.Microsecond)
hannahhoward's avatar
hannahhoward committed
203
	blockGenerator := blocksutil.NewBlockGenerator()
204
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
205 206 207 208
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
dirkmc's avatar
dirkmc committed
209
	_, err := session.GetBlocks(ctx, cids)
hannahhoward's avatar
hannahhoward committed
210 211 212 213
	if err != nil {
		t.Fatal("error getting blocks")
	}

dirkmc's avatar
dirkmc committed
214
	// The session should initially broadcast want-haves
215
	select {
Dirk McCormick's avatar
Dirk McCormick committed
216
	case <-fpm.wantReqs:
217 218 219
	case <-ctx.Done():
		t.Fatal("Did not make first want request ")
	}
220

hannahhoward's avatar
hannahhoward committed
221
	// receive a block to trigger a tick reset
222 223 224
	time.Sleep(20 * time.Millisecond) // need to make sure some latency registers
	// or there will be no tick set -- time precision on Windows in go is in the
	// millisecond range
hannahhoward's avatar
hannahhoward committed
225
	p := testutil.GeneratePeers(1)[0]
226 227

	blk := blks[0]
dirkmc's avatar
dirkmc committed
228 229 230 231
	session.ReceiveFrom(p, []cid.Cid{blk.Cid()}, []cid.Cid{}, []cid.Cid{})

	// The session should now time out waiting for a response and broadcast
	// want-haves again
232
	select {
Dirk McCormick's avatar
Dirk McCormick committed
233
	case <-fpm.wantReqs:
234 235 236
	case <-ctx.Done():
		t.Fatal("Did not make second want request ")
	}
hannahhoward's avatar
hannahhoward committed
237

238
	// The session should keep broadcasting periodically until it receives a response
239
	select {
Dirk McCormick's avatar
Dirk McCormick committed
240
	case receivedWantReq := <-fpm.wantReqs:
241
		if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
242 243
			t.Fatal("did not rebroadcast whole live list")
		}
244 245 246 247 248 249 250
		// Make sure the first block is not included because it has already
		// been received
		for _, c := range receivedWantReq.cids {
			if c.Equals(cids[0]) {
				t.Fatal("should not braodcast block that was already received")
			}
		}
251 252
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
hannahhoward's avatar
hannahhoward committed
253
	}
254

dirkmc's avatar
dirkmc committed
255
	// The session should eventually try to find more peers
256
	select {
257
	case <-fpf.findMorePeersRequested:
258 259
	case <-ctx.Done():
		t.Fatal("Did not find more peers")
hannahhoward's avatar
hannahhoward committed
260 261
	}
}
262

263 264 265
func TestSessionOnPeersExhausted(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()
Dirk McCormick's avatar
Dirk McCormick committed
266 267
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
268 269
	fpf := newFakeProviderFinder()

270 271 272 273 274
	sim := bssim.New()
	bpm := bsbpm.New()
	notif := notifications.New()
	defer notif.Shutdown()
	id := testutil.GenerateSessionID()
275
	session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
276 277 278 279 280 281 282 283 284 285 286 287 288
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5)
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	_, err := session.GetBlocks(ctx, cids)

	if err != nil {
		t.Fatal("error getting blocks")
	}

	// Wait for initial want request
Dirk McCormick's avatar
Dirk McCormick committed
289
	receivedWantReq := <-fpm.wantReqs
290 291 292 293 294 295 296 297 298 299

	// Should have sent out broadcast request for wants
	if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
		t.Fatal("did not enqueue correct initial number of wants")
	}

	// Signal that all peers have send DONT_HAVE for two of the wants
	session.onPeersExhausted(cids[len(cids)-2:])

	// Wait for want request
Dirk McCormick's avatar
Dirk McCormick committed
300
	receivedWantReq = <-fpm.wantReqs
301 302 303 304 305 306 307

	// Should have sent out broadcast request for wants
	if len(receivedWantReq.cids) != 2 {
		t.Fatal("did not enqueue correct initial number of wants")
	}
}

308
func TestSessionFailingToGetFirstBlock(t *testing.T) {
309
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
310
	defer cancel()
Dirk McCormick's avatar
Dirk McCormick committed
311 312
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
313
	fpf := newFakeProviderFinder()
dirkmc's avatar
dirkmc committed
314 315
	sim := bssim.New()
	bpm := bsbpm.New()
316 317
	notif := notifications.New()
	defer notif.Shutdown()
318
	id := testutil.GenerateSessionID()
319
	session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
320 321 322 323 324 325 326 327 328 329 330 331
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(4)
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	startTick := time.Now()
	_, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

dirkmc's avatar
dirkmc committed
332
	// The session should initially broadcast want-haves
333
	select {
Dirk McCormick's avatar
Dirk McCormick committed
334
	case <-fpm.wantReqs:
335 336 337 338
	case <-ctx.Done():
		t.Fatal("Did not make first want request ")
	}

dirkmc's avatar
dirkmc committed
339
	// Verify a broadcast was made
340
	select {
Dirk McCormick's avatar
Dirk McCormick committed
341
	case receivedWantReq := <-fpm.wantReqs:
342 343 344 345 346 347 348
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}

dirkmc's avatar
dirkmc committed
349
	// Wait for a request to find more peers to occur
350
	select {
351
	case k := <-fpf.findMorePeersRequested:
352 353 354 355 356 357 358 359
		if testutil.IndexOf(blks, k) == -1 {
			t.Fatal("did not rebroadcast an active want")
		}
	case <-ctx.Done():
		t.Fatal("Did not find more peers")
	}
	firstTickLength := time.Since(startTick)

dirkmc's avatar
dirkmc committed
360
	// Wait for another broadcast to occur
361
	select {
Dirk McCormick's avatar
Dirk McCormick committed
362
	case receivedWantReq := <-fpm.wantReqs:
363 364 365 366 367 368
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
dirkmc's avatar
dirkmc committed
369 370

	// Wait for another broadcast to occur
371 372
	startTick = time.Now()
	select {
Dirk McCormick's avatar
Dirk McCormick committed
373
	case receivedWantReq := <-fpm.wantReqs:
374 375 376 377 378 379
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
dirkmc's avatar
dirkmc committed
380 381

	// Tick should take longer
382 383 384 385
	consecutiveTickLength := time.Since(startTick)
	if firstTickLength > consecutiveTickLength {
		t.Fatal("Should have increased tick length after first consecutive tick")
	}
dirkmc's avatar
dirkmc committed
386 387

	// Wait for another broadcast to occur
388 389
	startTick = time.Now()
	select {
Dirk McCormick's avatar
Dirk McCormick committed
390
	case receivedWantReq := <-fpm.wantReqs:
391 392 393 394 395 396
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
dirkmc's avatar
dirkmc committed
397 398

	// Tick should take longer
399 400 401 402 403
	secondConsecutiveTickLength := time.Since(startTick)
	if consecutiveTickLength > secondConsecutiveTickLength {
		t.Fatal("Should have increased tick length after first consecutive tick")
	}

dirkmc's avatar
dirkmc committed
404
	// Should not have tried to find peers on consecutive ticks
405
	select {
406
	case <-fpf.findMorePeersRequested:
dirkmc's avatar
dirkmc committed
407
		t.Fatal("Should not have tried to find peers on consecutive ticks")
408 409 410
	default:
	}

dirkmc's avatar
dirkmc committed
411
	// Wait for rebroadcast to occur
412
	select {
413
	case k := <-fpf.findMorePeersRequested:
414 415 416 417 418 419 420
		if testutil.IndexOf(blks, k) == -1 {
			t.Fatal("did not rebroadcast an active want")
		}
	case <-ctx.Done():
		t.Fatal("Did not rebroadcast to find more peers")
	}
}
421 422

func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
Dirk McCormick's avatar
Dirk McCormick committed
423 424
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
425
	fpf := newFakeProviderFinder()
dirkmc's avatar
dirkmc committed
426 427
	sim := bssim.New()
	bpm := bsbpm.New()
428 429 430 431 432 433
	notif := notifications.New()
	defer notif.Shutdown()
	id := testutil.GenerateSessionID()

	// Create a new session with its own context
	sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
434
	session := New(context.Background(), sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462

	timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer timerCancel()

	// Request a block with a new context
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(1)
	getctx, getcancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
	defer getcancel()

	getBlocksCh, err := session.GetBlocks(getctx, []cid.Cid{blks[0].Cid()})
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// Cancel the session context
	sesscancel()

	// Expect the GetBlocks() channel to be closed
	select {
	case _, ok := <-getBlocksCh:
		if ok {
			t.Fatal("expected channel to be closed but was not closed")
		}
	case <-timerCtx.Done():
		t.Fatal("expected channel to be closed before timeout")
	}
}
dirkmc's avatar
dirkmc committed
463 464 465

func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
	ctx, cancelCtx := context.WithTimeout(context.Background(), 10*time.Millisecond)
Dirk McCormick's avatar
Dirk McCormick committed
466 467
	fpm := newFakePeerManager()
	fspm := newFakeSessionPeerManager()
468 469
	fpf := newFakeProviderFinder()

dirkmc's avatar
dirkmc committed
470 471 472 473 474
	sim := bssim.New()
	bpm := bsbpm.New()
	notif := notifications.New()
	defer notif.Shutdown()
	id := testutil.GenerateSessionID()
475
	session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
dirkmc's avatar
dirkmc committed
476 477 478 479 480 481 482 483 484 485
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(2)
	cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}

	_, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// Wait for initial want request
Dirk McCormick's avatar
Dirk McCormick committed
486
	<-fpm.wantReqs
dirkmc's avatar
dirkmc committed
487 488 489 490 491 492 493 494 495 496 497 498

	// Shut down session
	cancelCtx()

	// Simulate receiving block for a CID
	peer := testutil.GeneratePeers(1)[0]
	session.ReceiveFrom(peer, []cid.Cid{blks[0].Cid()}, []cid.Cid{}, []cid.Cid{})

	time.Sleep(5 * time.Millisecond)

	// If we don't get a panic then the test is considered passing
}