requestmanager_test.go 22.5 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
7
	"reflect"
8
	"sync"
9 10 11
	"testing"
	"time"

12
	"github.com/ipfs/go-graphsync"
13
	"github.com/ipfs/go-graphsync/requestmanager/types"
14
	"github.com/libp2p/go-libp2p-core/peer"
15 16 17

	"github.com/ipfs/go-graphsync/metadata"

18
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
19 20 21

	"github.com/ipld/go-ipld-prime"

22 23 24 25 26 27
	blocks "github.com/ipfs/go-block-format"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
28 29
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
30
}
31

32 33 34 35
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

36 37
func (fph *fakePeerHandler) SendRequest(p peer.ID,
	graphSyncRequest gsmsg.GraphSyncRequest) {
38
	fph.requestRecordChan <- requestRecord{
39 40
		gsr: graphSyncRequest,
		p:   p,
41 42 43
	}
}

44
type requestKey struct {
45
	requestID graphsync.RequestID
46 47 48 49 50 51
	link      ipld.Link
}

type fakeAsyncLoader struct {
	responseChannelsLk sync.RWMutex
	responseChannels   map[requestKey]chan types.AsyncLoadResult
52
	responses          chan map[graphsync.RequestID]metadata.Metadata
53 54 55 56 57 58
	blks               chan []blocks.Block
}

func newFakeAsyncLoader() *fakeAsyncLoader {
	return &fakeAsyncLoader{
		responseChannels: make(map[requestKey]chan types.AsyncLoadResult),
59
		responses:        make(chan map[graphsync.RequestID]metadata.Metadata, 1),
60 61 62
		blks:             make(chan []blocks.Block, 1),
	}
}
63
func (fal *fakeAsyncLoader) StartRequest(requestID graphsync.RequestID) {
64
}
65
func (fal *fakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
	blks []blocks.Block) {
	fal.responses <- responses
	fal.blks <- blks
}
func (fal *fakeAsyncLoader) verifyLastProcessedBlocks(ctx context.Context, t *testing.T, expectedBlocks []blocks.Block) {
	select {
	case <-ctx.Done():
		t.Fatal("should have processed blocks but didn't")
	case processedBlocks := <-fal.blks:
		if !reflect.DeepEqual(processedBlocks, expectedBlocks) {
			t.Fatal("Did not process correct blocks")
		}
	}
}
func (fal *fakeAsyncLoader) verifyLastProcessedResponses(ctx context.Context, t *testing.T,
81
	expectedResponses map[graphsync.RequestID]metadata.Metadata) {
82 83 84 85 86 87 88 89 90 91
	select {
	case <-ctx.Done():
		t.Fatal("should have processed responses but didn't")
	case responses := <-fal.responses:
		if !reflect.DeepEqual(responses, expectedResponses) {
			t.Fatal("Did not send proper metadata")
		}
	}
}

92
func (fal *fakeAsyncLoader) verifyNoRemainingData(t *testing.T, requestID graphsync.RequestID) {
93 94 95 96 97 98 99 100 101
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
		if key.requestID == requestID {
			t.Fatal("request not properly cleaned up")
		}
	}
	fal.responseChannelsLk.Unlock()
}

102
func (fal *fakeAsyncLoader) asyncLoad(requestID graphsync.RequestID, link ipld.Link) chan types.AsyncLoadResult {
103 104 105 106 107 108 109 110 111 112
	fal.responseChannelsLk.Lock()
	responseChannel, ok := fal.responseChannels[requestKey{requestID, link}]
	if !ok {
		responseChannel = make(chan types.AsyncLoadResult, 1)
		fal.responseChannels[requestKey{requestID, link}] = responseChannel
	}
	fal.responseChannelsLk.Unlock()
	return responseChannel
}

113
func (fal *fakeAsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
114 115
	return fal.asyncLoad(requestID, link)
}
116 117
func (fal *fakeAsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {}
func (fal *fakeAsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
118 119 120 121 122 123 124 125 126
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
		if key.requestID == requestID {
			delete(fal.responseChannels, key)
		}
	}
	fal.responseChannelsLk.Unlock()
}

127
func (fal *fakeAsyncLoader) responseOn(requestID graphsync.RequestID, link ipld.Link, result types.AsyncLoadResult) {
128 129 130 131 132
	responseChannel := fal.asyncLoad(requestID, link)
	responseChannel <- result
	close(responseChannel)
}

133
func (fal *fakeAsyncLoader) successResponseOn(requestID graphsync.RequestID, blks []blocks.Block) {
134 135 136 137 138
	for _, block := range blks {
		fal.responseOn(requestID, cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: block.RawData(), Err: nil})
	}
}

139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
		select {
		case rr := <-requestRecordChan:
			requestRecords = append(requestRecords, rr)
		case <-ctx.Done():
			t.Fatal("should have sent two requests to the network but did not")
		}
	}
	return requestRecords
}

155 156 157 158 159 160 161 162 163 164 165
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
			Link:         cidlink.Link{Cid: block.Cid()},
			BlockPresent: present,
		})
	}
	return md
}

166
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
167
	md := metadataForBlocks(blks, present)
168
	metadataEncoded, err := metadata.EncodeMetadata(md)
169 170 171
	if err != nil {
		t.Fatal("did not encode metadata")
	}
172 173
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
174 175
		Data: metadataEncoded,
	}
176 177
}

178 179 180 181
func TestNormalSimultaneousFetch(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
182
	fal := newFakeAsyncLoader()
183
	requestManager := New(ctx, fal)
184 185 186 187 188
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
189
	peers := testutil.GeneratePeers(1)
190

191
	blockStore := make(map[ipld.Link][]byte)
192
	loader, storer := testutil.NewTestStore(blockStore)
193 194
	blockChain1 := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	blockChain2 := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
195

196 197
	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx, peers[0], blockChain1.TipLink, blockChain1.Selector())
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
198

199
	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
200

201
	if requestRecords[0].p != peers[0] || requestRecords[1].p != peers[0] ||
202 203 204
		requestRecords[0].gsr.IsCancel() != false || requestRecords[1].gsr.IsCancel() != false ||
		requestRecords[0].gsr.Priority() != maxPriority ||
		requestRecords[1].gsr.Priority() != maxPriority {
205 206 207
		t.Fatal("did not send correct requests")
	}

208
	if !reflect.DeepEqual(blockChain1.Selector(), requestRecords[0].gsr.Selector()) {
209 210
		t.Fatal("did not encode selector properly")
	}
211
	if !reflect.DeepEqual(blockChain2.Selector(), requestRecords[1].gsr.Selector()) {
212 213 214
		t.Fatal("did not encode selector properly")
	}

215 216
	firstBlocks := append(blockChain1.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(blockChain1.AllBlocks(), true)
217
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
218 219 220
	if err != nil {
		t.Fatal("did not encode metadata")
	}
221
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
222
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
223 224 225
	if err != nil {
		t.Fatal("did not encode metadata")
	}
226
	firstResponses := []gsmsg.GraphSyncResponse{
227 228
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
229 230
			Data: firstMetadataEncoded1,
		}),
231 232
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
233 234
			Data: firstMetadataEncoded2,
		}),
235 236
	}

237 238
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	fal.verifyLastProcessedBlocks(ctx, t, firstBlocks)
239
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
240 241 242
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
243 244
	fal.successResponseOn(requestRecords[0].gsr.ID(), blockChain1.AllBlocks())
	fal.successResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
245

246 247
	blockChain1.VerifyWholeChain(requestCtx, returnedResponseChan1)
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
248

249
	moreBlocks := blockChain2.RemainderBlocks(3)
250
	moreMetadata := metadataForBlocks(moreBlocks, true)
251
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
252 253 254
	if err != nil {
		t.Fatal("did not encode metadata")
	}
255
	moreResponses := []gsmsg.GraphSyncResponse{
256 257
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
258 259
			Data: moreMetadataEncoded,
		}),
260 261
	}

262 263
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.verifyLastProcessedBlocks(ctx, t, moreBlocks)
264
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
265 266
		requestRecords[1].gsr.ID(): moreMetadata,
	})
267

268 269
	fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)

270
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
271 272
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
273 274 275 276 277 278
}

func TestCancelRequestInProgress(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
279
	fal := newFakeAsyncLoader()
280
	requestManager := New(ctx, fal)
281 282 283 284 285 286 287
	requestManager.SetDelegate(fph)
	requestManager.Startup()
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
288
	peers := testutil.GeneratePeers(1)
289

290
	blockStore := make(map[ipld.Link][]byte)
291
	loader, storer := testutil.NewTestStore(blockStore)
292
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
293

294 295
	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx1, peers[0], blockChain.TipLink, blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx2, peers[0], blockChain.TipLink, blockChain.Selector())
296

297
	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
298

299 300
	firstBlocks := blockChain.Blocks(0, 3)
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
301
	firstResponses := []gsmsg.GraphSyncResponse{
302 303
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
304 305
	}

306
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
307

308 309 310
	fal.successResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	fal.successResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
	blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
311
	cancel1()
312
	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
313
	if rr.gsr.IsCancel() != true || rr.gsr.ID() != requestRecords[0].gsr.ID() {
314
		t.Fatal("did not send correct cancel message over network")
315 316
	}

317 318
	moreBlocks := blockChain.RemainderBlocks(3)
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
319
	moreResponses := []gsmsg.GraphSyncResponse{
320 321
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
322
	}
323
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
324 325
	fal.successResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
326

327 328
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
329 330
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
331 332 333 334 335 336 337
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
338
	fal := newFakeAsyncLoader()
339
	requestManager := New(managerCtx, fal)
340 341 342 343
	requestManager.SetDelegate(fph)
	requestManager.Startup()
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
344
	peers := testutil.GeneratePeers(1)
345

346
	blockStore := make(map[ipld.Link][]byte)
347
	loader, storer := testutil.NewTestStore(blockStore)
348
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
349

350
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
351

352
	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
353

354
	firstBlocks := blockChain.Blocks(0, 3)
355
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
356
	firstResponses := []gsmsg.GraphSyncResponse{
357
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
358
	}
359 360
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	fal.successResponseOn(rr.gsr.ID(), firstBlocks)
361
	blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
362 363
	managerCancel()

364
	moreBlocks := blockChain.RemainderBlocks(3)
365
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
366
	moreResponses := []gsmsg.GraphSyncResponse{
367
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
368
	}
369 370
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.successResponseOn(rr.gsr.ID(), moreBlocks)
371
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
372
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
373 374 375 376 377 378
}

func TestUnencodableSelector(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
379
	fal := newFakeAsyncLoader()
380
	requestManager := New(ctx, fal)
381 382 383 384 385 386 387
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

388 389
	s := testutil.NewUnencodableSelectorSpec()
	r := cidlink.Link{Cid: testutil.GenerateCids(1)[0]}
390
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
391

392 393
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
394
}
395 396 397 398 399

func TestFailedRequest(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
400
	fal := newFakeAsyncLoader()
401
	requestManager := New(ctx, fal)
402 403 404 405 406
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
407
	peers := testutil.GeneratePeers(1)
408

409
	blockStore := make(map[ipld.Link][]byte)
410
	loader, storer := testutil.NewTestStore(blockStore)
411 412 413
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)

	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
414 415

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
416
	failedResponses := []gsmsg.GraphSyncResponse{
417
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
418
	}
419
	requestManager.ProcessResponses(peers[0], failedResponses, nil)
420

421 422
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
423 424 425 426 427 428 429
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
430
	requestManager := New(ctx, fal)
431 432 433 434 435 436 437
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

438
	blockStore := make(map[ipld.Link][]byte)
439
	loader, storer := testutil.NewTestStore(blockStore)
440 441 442
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)

	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
443 444 445 446

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	// async loaded response responds immediately
447
	fal.successResponseOn(rr.gsr.ID(), blockChain.AllBlocks())
448

449
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
450 451 452

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
453
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
454 455 456
	}

	requestManager.ProcessResponses(peers[0], failedResponses, nil)
457
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
458 459 460 461 462 463 464 465

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
466
	requestManager := New(ctx, fal)
467 468 469 470 471 472 473
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

474
	blockStore := make(map[ipld.Link][]byte)
475
	loader, storer := testutil.NewTestStore(blockStore)
476 477
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
478 479 480 481

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	// async loaded response responds immediately
482
	fal.successResponseOn(rr.gsr.ID(), blockChain.AllBlocks())
483

484
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
485

486
	md := encodedMetadataForBlocks(t, blockChain.AllBlocks(), true)
487
	firstResponses := []gsmsg.GraphSyncResponse{
488
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
489
	}
490
	requestManager.ProcessResponses(peers[0], firstResponses, blockChain.AllBlocks())
491 492

	fal.verifyNoRemainingData(t, rr.gsr.ID())
493
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
494 495 496 497 498 499 500
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
501
	requestManager := New(ctx, fal)
502 503 504 505 506 507 508
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

509
	blockStore := make(map[ipld.Link][]byte)
510
	loader, storer := testutil.NewTestStore(blockStore)
511 512
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
513 514 515

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

516
	md := encodedMetadataForBlocks(t, blockChain.AllBlocks(), false)
517
	firstResponses := []gsmsg.GraphSyncResponse{
518
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
519 520
	}
	requestManager.ProcessResponses(peers[0], firstResponses, nil)
521
	for _, block := range blockChain.AllBlocks() {
522 523
		fal.responseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
	}
524 525
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
526 527
	if len(errs) == 0 {
		t.Fatal("did not send  errors")
528 529
	}

530
}
531 532 533 534 535 536

func TestEncodingExtensions(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
537
	requestManager := New(ctx, fal)
538 539 540 541 542 543 544
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

545
	blockStore := make(map[ipld.Link][]byte)
546
	loader, storer := testutil.NewTestStore(blockStore)
547
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
548 549 550 551 552 553 554 555 556 557 558 559 560

	extensionData1 := testutil.RandomBytes(100)
	extensionName1 := graphsync.ExtensionName("AppleSauce/McGee")
	extension1 := graphsync.ExtensionData{
		Name: extensionName1,
		Data: extensionData1,
	}
	extensionData2 := testutil.RandomBytes(100)
	extensionName2 := graphsync.ExtensionName("HappyLand/Happenstance")
	extension2 := graphsync.ExtensionData{
		Name: extensionName2,
		Data: extensionData2,
	}
561 562 563 564 565 566 567 568 569 570 571 572

	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData) error {
		data, has := responseData.Extension(extensionName1)
		if !has {
			t.Fatal("Did not receive extension data in response")
		}
		receivedExtensionData <- data
		return <-expectedError
	}
	requestManager.RegisterHook(hook)
573
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector(), extension1, extension2)
574 575 576 577 578 579 580 581 582 583 584 585 586 587

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	gsr := rr.gsr
	returnedData1, found := gsr.Extension(extensionName1)
	if !found || !reflect.DeepEqual(extensionData1, returnedData1) {
		t.Fatal("Failed to encode first extension")
	}

	returnedData2, found := gsr.Extension(extensionName2)
	if !found || !reflect.DeepEqual(extensionData2, returnedData2) {
		t.Fatal("Failed to encode first extension")
	}

588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
		requestManager.ProcessResponses(peers[0], firstResponses, nil)
		select {
		case <-requestCtx.Done():
			t.Fatal("Should have checked extension but didn't")
		case received := <-receivedExtensionData:
			if !reflect.DeepEqual(received, expectedData) {
				t.Fatal("Did not receive correct extension data from resposne")
			}
		}
		nextExpectedData := testutil.RandomBytes(100)

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
		requestManager.ProcessResponses(peers[0], secondResponses, nil)
		select {
		case <-requestCtx.Done():
			t.Fatal("Should have checked extension but didn't")
		case received := <-receivedExtensionData:
			if !reflect.DeepEqual(received, nextExpectedData) {
				t.Fatal("Did not receive correct extension data from resposne")
			}
		}
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
639
}