requestmanager_test.go 21.5 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"fmt"
6
	"reflect"
7
	"sync"
8 9 10
	"testing"
	"time"

11
	"github.com/ipfs/go-graphsync"
12 13
	"github.com/ipfs/go-graphsync/ipldbridge"
	"github.com/ipfs/go-graphsync/requestmanager/types"
14
	"github.com/libp2p/go-libp2p-core/peer"
15 16 17

	"github.com/ipfs/go-graphsync/metadata"

18
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
19 20 21

	"github.com/ipld/go-ipld-prime"

22
	blocks "github.com/ipfs/go-block-format"
23
	cid "github.com/ipfs/go-cid"
24 25 26 27 28 29
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testbridge"
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
30 31
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
32
}
33

34 35 36 37
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

38 39
func (fph *fakePeerHandler) SendRequest(p peer.ID,
	graphSyncRequest gsmsg.GraphSyncRequest) {
40
	fph.requestRecordChan <- requestRecord{
41 42
		gsr: graphSyncRequest,
		p:   p,
43 44 45
	}
}

46
type requestKey struct {
47
	requestID graphsync.RequestID
48 49 50 51 52 53
	link      ipld.Link
}

type fakeAsyncLoader struct {
	responseChannelsLk sync.RWMutex
	responseChannels   map[requestKey]chan types.AsyncLoadResult
54
	responses          chan map[graphsync.RequestID]metadata.Metadata
55 56 57 58 59 60
	blks               chan []blocks.Block
}

func newFakeAsyncLoader() *fakeAsyncLoader {
	return &fakeAsyncLoader{
		responseChannels: make(map[requestKey]chan types.AsyncLoadResult),
61
		responses:        make(chan map[graphsync.RequestID]metadata.Metadata, 1),
62 63 64
		blks:             make(chan []blocks.Block, 1),
	}
}
65
func (fal *fakeAsyncLoader) StartRequest(requestID graphsync.RequestID) {
66
}
67
func (fal *fakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
	blks []blocks.Block) {
	fal.responses <- responses
	fal.blks <- blks
}
func (fal *fakeAsyncLoader) verifyLastProcessedBlocks(ctx context.Context, t *testing.T, expectedBlocks []blocks.Block) {
	select {
	case <-ctx.Done():
		t.Fatal("should have processed blocks but didn't")
	case processedBlocks := <-fal.blks:
		if !reflect.DeepEqual(processedBlocks, expectedBlocks) {
			t.Fatal("Did not process correct blocks")
		}
	}
}
func (fal *fakeAsyncLoader) verifyLastProcessedResponses(ctx context.Context, t *testing.T,
83
	expectedResponses map[graphsync.RequestID]metadata.Metadata) {
84 85 86 87 88 89 90 91 92 93
	select {
	case <-ctx.Done():
		t.Fatal("should have processed responses but didn't")
	case responses := <-fal.responses:
		if !reflect.DeepEqual(responses, expectedResponses) {
			t.Fatal("Did not send proper metadata")
		}
	}
}

94
func (fal *fakeAsyncLoader) verifyNoRemainingData(t *testing.T, requestID graphsync.RequestID) {
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
		if key.requestID == requestID {
			t.Fatal("request not properly cleaned up")
		}
	}
	fal.responseChannelsLk.Unlock()
}

func cidsForBlocks(blks []blocks.Block) []cid.Cid {
	cids := make([]cid.Cid, 0, 5)
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	return cids
}

112
func (fal *fakeAsyncLoader) asyncLoad(requestID graphsync.RequestID, link ipld.Link) chan types.AsyncLoadResult {
113 114 115 116 117 118 119 120 121 122
	fal.responseChannelsLk.Lock()
	responseChannel, ok := fal.responseChannels[requestKey{requestID, link}]
	if !ok {
		responseChannel = make(chan types.AsyncLoadResult, 1)
		fal.responseChannels[requestKey{requestID, link}] = responseChannel
	}
	fal.responseChannelsLk.Unlock()
	return responseChannel
}

123
func (fal *fakeAsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
124 125
	return fal.asyncLoad(requestID, link)
}
126 127
func (fal *fakeAsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {}
func (fal *fakeAsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
128 129 130 131 132 133 134 135 136
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
		if key.requestID == requestID {
			delete(fal.responseChannels, key)
		}
	}
	fal.responseChannelsLk.Unlock()
}

137
func (fal *fakeAsyncLoader) responseOn(requestID graphsync.RequestID, link ipld.Link, result types.AsyncLoadResult) {
138 139 140 141 142
	responseChannel := fal.asyncLoad(requestID, link)
	responseChannel <- result
	close(responseChannel)
}

143
func (fal *fakeAsyncLoader) successResponseOn(requestID graphsync.RequestID, blks []blocks.Block) {
144 145 146 147 148
	for _, block := range blks {
		fal.responseOn(requestID, cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: block.RawData(), Err: nil})
	}
}

149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
		select {
		case rr := <-requestRecordChan:
			requestRecords = append(requestRecords, rr)
		case <-ctx.Done():
			t.Fatal("should have sent two requests to the network but did not")
		}
	}
	return requestRecords
}

165
func verifyMatchedResponses(t *testing.T, actualResponse []graphsync.ResponseProgress, expectedBlocks []blocks.Block) {
166 167
	if len(actualResponse) != len(expectedBlocks) {
		t.Fatal("wrong number of responses sent")
168
	}
169 170 171 172 173 174 175 176 177
	for _, responseProgress := range actualResponse {
		data, err := responseProgress.Node.AsBytes()
		if err != nil {
			t.Fatal("Node was not a block")
		}
		blk, err := blocks.NewBlockWithCid(data, responseProgress.LastBlock.Link.(cidlink.Link).Cid)
		if err != nil {
			t.Fatal("block did not verify")
		}
178 179 180 181 182 183
		if !testutil.ContainsBlock(expectedBlocks, blk) {
			t.Fatal("wrong block sent")
		}
	}
}

184 185 186 187 188 189 190 191 192 193 194
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
			Link:         cidlink.Link{Cid: block.Cid()},
			BlockPresent: present,
		})
	}
	return md
}

195
func encodedMetadataForBlocks(t *testing.T, ipldBridge ipldbridge.IPLDBridge, blks []blocks.Block, present bool) graphsync.ExtensionData {
196 197 198 199 200
	md := metadataForBlocks(blks, present)
	metadataEncoded, err := metadata.EncodeMetadata(md, ipldBridge)
	if err != nil {
		t.Fatal("did not encode metadata")
	}
201 202
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
203 204
		Data: metadataEncoded,
	}
205 206
}

207 208 209 210 211
func TestNormalSimultaneousFetch(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
212 213
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
214 215 216 217 218
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
219
	peers := testutil.GeneratePeers(1)
220

221 222
	blocks1 := testutil.GenerateBlocksOfSize(5, 100)
	blocks2 := testutil.GenerateBlocksOfSize(5, 100)
223 224
	r1 := cidlink.Link{Cid: blocks1[0].Cid()}
	r2 := cidlink.Link{Cid: blocks2[0].Cid()}
225 226
	s1 := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks1))
	s2 := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks2))
227

228 229
	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx, peers[0], r1, s1)
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx, peers[0], r2, s2)
230

231
	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
232

233
	if requestRecords[0].p != peers[0] || requestRecords[1].p != peers[0] ||
234 235 236
		requestRecords[0].gsr.IsCancel() != false || requestRecords[1].gsr.IsCancel() != false ||
		requestRecords[0].gsr.Priority() != maxPriority ||
		requestRecords[1].gsr.Priority() != maxPriority {
237 238 239
		t.Fatal("did not send correct requests")
	}

240
	returnedS1, err := fakeIPLDBridge.DecodeNode(requestRecords[0].gsr.Selector())
241 242 243
	if err != nil || !reflect.DeepEqual(s1, returnedS1) {
		t.Fatal("did not encode selector properly")
	}
244
	returnedS2, err := fakeIPLDBridge.DecodeNode(requestRecords[1].gsr.Selector())
245 246 247 248
	if err != nil || !reflect.DeepEqual(s2, returnedS2) {
		t.Fatal("did not encode selector properly")
	}

249 250 251 252 253 254 255 256 257 258 259
	firstBlocks := append(blocks1, blocks2[:3]...)
	firstMetadata1 := metadataForBlocks(blocks1, true)
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1, fakeIPLDBridge)
	if err != nil {
		t.Fatal("did not encode metadata")
	}
	firstMetadata2 := metadataForBlocks(blocks2[:3], true)
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2, fakeIPLDBridge)
	if err != nil {
		t.Fatal("did not encode metadata")
	}
260
	firstResponses := []gsmsg.GraphSyncResponse{
261 262
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
263 264
			Data: firstMetadataEncoded1,
		}),
265 266
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
267 268
			Data: firstMetadataEncoded2,
		}),
269 270
	}

271 272
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	fal.verifyLastProcessedBlocks(ctx, t, firstBlocks)
273
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
274 275 276 277 278 279
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
	fal.successResponseOn(requestRecords[0].gsr.ID(), blocks1)
	fal.successResponseOn(requestRecords[1].gsr.ID(), blocks2[:3])

280
	responses1 := testutil.CollectResponses(requestCtx, t, returnedResponseChan1)
281
	verifyMatchedResponses(t, responses1, blocks1)
282
	responses2 := testutil.ReadNResponses(requestCtx, t, returnedResponseChan2, 3)
283 284 285 286 287 288 289 290
	verifyMatchedResponses(t, responses2, blocks2[:3])

	moreBlocks := blocks2[3:]
	moreMetadata := metadataForBlocks(moreBlocks, true)
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata, fakeIPLDBridge)
	if err != nil {
		t.Fatal("did not encode metadata")
	}
291
	moreResponses := []gsmsg.GraphSyncResponse{
292 293
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
294 295
			Data: moreMetadataEncoded,
		}),
296 297
	}

298 299
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.verifyLastProcessedBlocks(ctx, t, moreBlocks)
300
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
301 302
		requestRecords[1].gsr.ID(): moreMetadata,
	})
303

304 305
	fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)

306
	responses2 = testutil.CollectResponses(requestCtx, t, returnedResponseChan2)
307
	verifyMatchedResponses(t, responses2, moreBlocks)
308 309
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
310 311 312 313 314 315 316
}

func TestCancelRequestInProgress(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
317 318
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
319 320 321 322 323 324 325
	requestManager.SetDelegate(fph)
	requestManager.Startup()
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
326
	peers := testutil.GeneratePeers(1)
327

328 329
	blocks1 := testutil.GenerateBlocksOfSize(5, 100)
	s1 := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks1))
330
	r1 := cidlink.Link{Cid: blocks1[0].Cid()}
331

332 333
	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx1, peers[0], r1, s1)
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx2, peers[0], r1, s1)
334

335
	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
336

337 338
	firstBlocks := blocks1[:3]
	firstMetadata := encodedMetadataForBlocks(t, fakeIPLDBridge, blocks1[:3], true)
339
	firstResponses := []gsmsg.GraphSyncResponse{
340 341
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
342 343
	}

344
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
345

346 347
	fal.successResponseOn(requestRecords[0].gsr.ID(), blocks1[:3])
	fal.successResponseOn(requestRecords[1].gsr.ID(), blocks1[:3])
348
	responses1 := testutil.ReadNResponses(requestCtx, t, returnedResponseChan1, 3)
349 350

	cancel1()
351
	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
352
	if rr.gsr.IsCancel() != true || rr.gsr.ID() != requestRecords[0].gsr.ID() {
353
		t.Fatal("did not send correct cancel message over network")
354 355
	}

356 357
	moreBlocks := blocks1[3:]
	moreMetadata := encodedMetadataForBlocks(t, fakeIPLDBridge, blocks1[3:], true)
358
	moreResponses := []gsmsg.GraphSyncResponse{
359 360
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
361
	}
362 363 364 365
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.successResponseOn(requestRecords[0].gsr.ID(), blocks1[3:])
	fal.successResponseOn(requestRecords[1].gsr.ID(), blocks1[3:])

366
	responses1 = append(responses1, testutil.CollectResponses(requestCtx, t, returnedResponseChan1)...)
367
	verifyMatchedResponses(t, responses1, blocks1[:3])
368
	responses2 := testutil.CollectResponses(requestCtx, t, returnedResponseChan2)
369
	verifyMatchedResponses(t, responses2, blocks1)
370 371
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
372 373 374 375 376 377 378 379
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
380 381
	fal := newFakeAsyncLoader()
	requestManager := New(managerCtx, fal, fakeIPLDBridge)
382 383 384 385
	requestManager.SetDelegate(fph)
	requestManager.Startup()
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
386
	peers := testutil.GeneratePeers(1)
387

388 389
	blocks := testutil.GenerateBlocksOfSize(5, 100)
	s := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks))
390 391 392
	r := cidlink.Link{Cid: blocks[0].Cid()}

	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
393

394
	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
395

396 397
	firstBlocks := blocks[:3]
	firstMetadata := encodedMetadataForBlocks(t, fakeIPLDBridge, firstBlocks, true)
398
	firstResponses := []gsmsg.GraphSyncResponse{
399
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
400
	}
401 402
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	fal.successResponseOn(rr.gsr.ID(), firstBlocks)
403
	responses := testutil.ReadNResponses(requestCtx, t, returnedResponseChan, 3)
404 405
	managerCancel()

406 407
	moreBlocks := blocks[3:]
	moreMetadata := encodedMetadataForBlocks(t, fakeIPLDBridge, moreBlocks, true)
408
	moreResponses := []gsmsg.GraphSyncResponse{
409
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
410
	}
411 412
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.successResponseOn(rr.gsr.ID(), moreBlocks)
413
	responses = append(responses, testutil.CollectResponses(requestCtx, t, returnedResponseChan)...)
414
	verifyMatchedResponses(t, responses, firstBlocks)
415
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
416 417 418 419 420 421 422
}

func TestInvalidSelector(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
423 424
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
425 426 427 428 429 430 431
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

432 433 434 435
	cids := testutil.GenerateCids(5)
	s := testbridge.NewUnencodableSelectorSpec(cids)
	r := cidlink.Link{Cid: cids[0]}
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
436

437 438
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
439 440 441 442 443 444 445
}

func TestUnencodableSelector(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
446 447
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
448 449 450 451 452 453 454
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

455 456 457 458
	cids := testutil.GenerateCids(5)
	s := testbridge.NewUnencodableSelectorSpec(cids)
	r := cidlink.Link{Cid: cids[0]}
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
459

460 461
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
462
}
463 464 465 466 467 468

func TestFailedRequest(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
469 470
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
471 472 473 474 475
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
476
	peers := testutil.GeneratePeers(1)
477

478 479
	blocks := testutil.GenerateBlocksOfSize(5, 100)
	s := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks))
480 481
	r := cidlink.Link{Cid: blocks[0].Cid()}
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
482 483

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
484
	failedResponses := []gsmsg.GraphSyncResponse{
485
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
486
	}
487
	requestManager.ProcessResponses(peers[0], failedResponses, nil)
488

489 490
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocks := testutil.GenerateBlocksOfSize(5, 100)
	s := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks))
509 510
	r := cidlink.Link{Cid: blocks[0].Cid()}
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
511 512 513 514 515 516

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	// async loaded response responds immediately
	fal.successResponseOn(rr.gsr.ID(), blocks)

517
	responses := testutil.CollectResponses(requestCtx, t, returnedResponseChan)
518 519 520 521
	verifyMatchedResponses(t, responses, blocks)

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
522
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
523 524 525
	}

	requestManager.ProcessResponses(peers[0], failedResponses, nil)
526
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocks := testutil.GenerateBlocksOfSize(5, 100)
	s := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks))
546 547
	r := cidlink.Link{Cid: blocks[0].Cid()}
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
548 549 550 551 552 553

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	// async loaded response responds immediately
	fal.successResponseOn(rr.gsr.ID(), blocks)

554
	responses := testutil.CollectResponses(requestCtx, t, returnedResponseChan)
555 556 557 558
	verifyMatchedResponses(t, responses, blocks)

	md := encodedMetadataForBlocks(t, fakeIPLDBridge, blocks, true)
	firstResponses := []gsmsg.GraphSyncResponse{
559
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
560 561 562 563
	}
	requestManager.ProcessResponses(peers[0], firstResponses, blocks)

	fal.verifyNoRemainingData(t, rr.gsr.ID())
564
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocks := testutil.GenerateBlocksOfSize(5, 100)
	s := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks))
583 584
	r := cidlink.Link{Cid: blocks[0].Cid()}
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
585 586 587 588 589

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	md := encodedMetadataForBlocks(t, fakeIPLDBridge, blocks, false)
	firstResponses := []gsmsg.GraphSyncResponse{
590
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
591 592 593 594 595
	}
	requestManager.ProcessResponses(peers[0], firstResponses, nil)
	for _, block := range blocks {
		fal.responseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
	}
596 597
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
598 599 600 601
	if len(errs) != len(blocks) {
		t.Fatal("did not send all errors")
	}

602
}