requestmanager_test.go 23.6 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
7
	"reflect"
8
	"sync"
9 10 11
	"testing"
	"time"

12
	"github.com/ipfs/go-graphsync"
13
	"github.com/ipfs/go-graphsync/requestmanager/types"
14
	"github.com/libp2p/go-libp2p-core/peer"
15 16 17

	"github.com/ipfs/go-graphsync/metadata"

18
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
19 20 21

	"github.com/ipld/go-ipld-prime"

22
	blocks "github.com/ipfs/go-block-format"
23
	cid "github.com/ipfs/go-cid"
24 25 26 27 28 29
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testbridge"
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
30 31
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
32
}
33

34 35 36 37
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

38 39
func (fph *fakePeerHandler) SendRequest(p peer.ID,
	graphSyncRequest gsmsg.GraphSyncRequest) {
40
	fph.requestRecordChan <- requestRecord{
41 42
		gsr: graphSyncRequest,
		p:   p,
43 44 45
	}
}

46
type requestKey struct {
47
	requestID graphsync.RequestID
48 49 50 51 52 53
	link      ipld.Link
}

type fakeAsyncLoader struct {
	responseChannelsLk sync.RWMutex
	responseChannels   map[requestKey]chan types.AsyncLoadResult
54
	responses          chan map[graphsync.RequestID]metadata.Metadata
55 56 57 58 59 60
	blks               chan []blocks.Block
}

func newFakeAsyncLoader() *fakeAsyncLoader {
	return &fakeAsyncLoader{
		responseChannels: make(map[requestKey]chan types.AsyncLoadResult),
61
		responses:        make(chan map[graphsync.RequestID]metadata.Metadata, 1),
62 63 64
		blks:             make(chan []blocks.Block, 1),
	}
}
65
func (fal *fakeAsyncLoader) StartRequest(requestID graphsync.RequestID) {
66
}
67
func (fal *fakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
	blks []blocks.Block) {
	fal.responses <- responses
	fal.blks <- blks
}
func (fal *fakeAsyncLoader) verifyLastProcessedBlocks(ctx context.Context, t *testing.T, expectedBlocks []blocks.Block) {
	select {
	case <-ctx.Done():
		t.Fatal("should have processed blocks but didn't")
	case processedBlocks := <-fal.blks:
		if !reflect.DeepEqual(processedBlocks, expectedBlocks) {
			t.Fatal("Did not process correct blocks")
		}
	}
}
func (fal *fakeAsyncLoader) verifyLastProcessedResponses(ctx context.Context, t *testing.T,
83
	expectedResponses map[graphsync.RequestID]metadata.Metadata) {
84 85 86 87 88 89 90 91 92 93
	select {
	case <-ctx.Done():
		t.Fatal("should have processed responses but didn't")
	case responses := <-fal.responses:
		if !reflect.DeepEqual(responses, expectedResponses) {
			t.Fatal("Did not send proper metadata")
		}
	}
}

94
func (fal *fakeAsyncLoader) verifyNoRemainingData(t *testing.T, requestID graphsync.RequestID) {
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
		if key.requestID == requestID {
			t.Fatal("request not properly cleaned up")
		}
	}
	fal.responseChannelsLk.Unlock()
}

func cidsForBlocks(blks []blocks.Block) []cid.Cid {
	cids := make([]cid.Cid, 0, 5)
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	return cids
}

112
func (fal *fakeAsyncLoader) asyncLoad(requestID graphsync.RequestID, link ipld.Link) chan types.AsyncLoadResult {
113 114 115 116 117 118 119 120 121 122
	fal.responseChannelsLk.Lock()
	responseChannel, ok := fal.responseChannels[requestKey{requestID, link}]
	if !ok {
		responseChannel = make(chan types.AsyncLoadResult, 1)
		fal.responseChannels[requestKey{requestID, link}] = responseChannel
	}
	fal.responseChannelsLk.Unlock()
	return responseChannel
}

123
func (fal *fakeAsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
124 125
	return fal.asyncLoad(requestID, link)
}
126 127
func (fal *fakeAsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {}
func (fal *fakeAsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
128 129 130 131 132 133 134 135 136
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
		if key.requestID == requestID {
			delete(fal.responseChannels, key)
		}
	}
	fal.responseChannelsLk.Unlock()
}

137
func (fal *fakeAsyncLoader) responseOn(requestID graphsync.RequestID, link ipld.Link, result types.AsyncLoadResult) {
138 139 140 141 142
	responseChannel := fal.asyncLoad(requestID, link)
	responseChannel <- result
	close(responseChannel)
}

143
func (fal *fakeAsyncLoader) successResponseOn(requestID graphsync.RequestID, blks []blocks.Block) {
144 145 146 147 148
	for _, block := range blks {
		fal.responseOn(requestID, cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: block.RawData(), Err: nil})
	}
}

149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
		select {
		case rr := <-requestRecordChan:
			requestRecords = append(requestRecords, rr)
		case <-ctx.Done():
			t.Fatal("should have sent two requests to the network but did not")
		}
	}
	return requestRecords
}

165 166 167 168 169 170 171 172 173 174 175
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
			Link:         cidlink.Link{Cid: block.Cid()},
			BlockPresent: present,
		})
	}
	return md
}

176
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
177
	md := metadataForBlocks(blks, present)
178
	metadataEncoded, err := metadata.EncodeMetadata(md)
179 180 181
	if err != nil {
		t.Fatal("did not encode metadata")
	}
182 183
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
184 185
		Data: metadataEncoded,
	}
186 187
}

188 189 190 191
func TestNormalSimultaneousFetch(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
192
	fal := newFakeAsyncLoader()
193
	requestManager := New(ctx, fal)
194 195 196 197 198
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
199
	peers := testutil.GeneratePeers(1)
200

201 202 203 204
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain1 := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	blockChain2 := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
205

206 207
	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx, peers[0], blockChain1.TipLink, blockChain1.Selector())
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
208

209
	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
210

211
	if requestRecords[0].p != peers[0] || requestRecords[1].p != peers[0] ||
212 213 214
		requestRecords[0].gsr.IsCancel() != false || requestRecords[1].gsr.IsCancel() != false ||
		requestRecords[0].gsr.Priority() != maxPriority ||
		requestRecords[1].gsr.Priority() != maxPriority {
215 216 217
		t.Fatal("did not send correct requests")
	}

218
	if !reflect.DeepEqual(blockChain1.Selector(), requestRecords[0].gsr.Selector()) {
219 220
		t.Fatal("did not encode selector properly")
	}
221
	if !reflect.DeepEqual(blockChain2.Selector(), requestRecords[1].gsr.Selector()) {
222 223 224
		t.Fatal("did not encode selector properly")
	}

225 226
	firstBlocks := append(blockChain1.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(blockChain1.AllBlocks(), true)
227
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
228 229 230
	if err != nil {
		t.Fatal("did not encode metadata")
	}
231
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
232
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
233 234 235
	if err != nil {
		t.Fatal("did not encode metadata")
	}
236
	firstResponses := []gsmsg.GraphSyncResponse{
237 238
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
239 240
			Data: firstMetadataEncoded1,
		}),
241 242
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
243 244
			Data: firstMetadataEncoded2,
		}),
245 246
	}

247 248
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	fal.verifyLastProcessedBlocks(ctx, t, firstBlocks)
249
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
250 251 252
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
253 254
	fal.successResponseOn(requestRecords[0].gsr.ID(), blockChain1.AllBlocks())
	fal.successResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
255

256 257
	blockChain1.VerifyWholeChain(requestCtx, returnedResponseChan1)
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
258

259
	moreBlocks := blockChain2.RemainderBlocks(3)
260
	moreMetadata := metadataForBlocks(moreBlocks, true)
261
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
262 263 264
	if err != nil {
		t.Fatal("did not encode metadata")
	}
265
	moreResponses := []gsmsg.GraphSyncResponse{
266 267
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
268 269
			Data: moreMetadataEncoded,
		}),
270 271
	}

272 273
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.verifyLastProcessedBlocks(ctx, t, moreBlocks)
274
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
275 276
		requestRecords[1].gsr.ID(): moreMetadata,
	})
277

278 279
	fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)

280
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
281 282
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
283 284 285 286 287 288
}

func TestCancelRequestInProgress(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
289
	fal := newFakeAsyncLoader()
290
	requestManager := New(ctx, fal)
291 292 293 294 295 296 297
	requestManager.SetDelegate(fph)
	requestManager.Startup()
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
298
	peers := testutil.GeneratePeers(1)
299

300 301 302
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
303

304 305
	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx1, peers[0], blockChain.TipLink, blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx2, peers[0], blockChain.TipLink, blockChain.Selector())
306

307
	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
308

309 310
	firstBlocks := blockChain.Blocks(0, 3)
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
311
	firstResponses := []gsmsg.GraphSyncResponse{
312 313
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
314 315
	}

316
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
317

318 319 320
	fal.successResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	fal.successResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
	blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
321
	cancel1()
322
	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
323
	if rr.gsr.IsCancel() != true || rr.gsr.ID() != requestRecords[0].gsr.ID() {
324
		t.Fatal("did not send correct cancel message over network")
325 326
	}

327 328
	moreBlocks := blockChain.RemainderBlocks(3)
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
329
	moreResponses := []gsmsg.GraphSyncResponse{
330 331
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
332
	}
333
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
334 335
	fal.successResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
336

337 338
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
339 340
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
341 342 343 344 345 346 347
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
348
	fal := newFakeAsyncLoader()
349
	requestManager := New(managerCtx, fal)
350 351 352 353
	requestManager.SetDelegate(fph)
	requestManager.Startup()
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
354
	peers := testutil.GeneratePeers(1)
355

356 357 358
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
359

360
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
361

362
	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
363

364
	firstBlocks := blockChain.Blocks(0, 3)
365
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
366
	firstResponses := []gsmsg.GraphSyncResponse{
367
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
368
	}
369 370
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	fal.successResponseOn(rr.gsr.ID(), firstBlocks)
371
	blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
372 373
	managerCancel()

374
	moreBlocks := blockChain.RemainderBlocks(3)
375
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
376
	moreResponses := []gsmsg.GraphSyncResponse{
377
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
378
	}
379 380
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.successResponseOn(rr.gsr.ID(), moreBlocks)
381
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
382
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
383 384
}

385
/*func TestInvalidSelector(t *testing.T) {
386 387 388 389
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
390 391
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
392 393 394 395 396 397 398
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

399 400 401 402
	cids := testutil.GenerateCids(5)
	s := testbridge.NewUnencodableSelectorSpec(cids)
	r := cidlink.Link{Cid: cids[0]}
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
403

404 405
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
406
}*/
407 408 409 410 411

func TestUnencodableSelector(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
412
	fal := newFakeAsyncLoader()
413
	requestManager := New(ctx, fal)
414 415 416 417 418 419 420
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

421 422 423 424
	cids := testutil.GenerateCids(5)
	s := testbridge.NewUnencodableSelectorSpec(cids)
	r := cidlink.Link{Cid: cids[0]}
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
425

426 427
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
428
}
429 430 431 432 433

func TestFailedRequest(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
434
	fal := newFakeAsyncLoader()
435
	requestManager := New(ctx, fal)
436 437 438 439 440
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
441
	peers := testutil.GeneratePeers(1)
442

443 444 445 446 447
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)

	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
448 449

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
450
	failedResponses := []gsmsg.GraphSyncResponse{
451
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
452
	}
453
	requestManager.ProcessResponses(peers[0], failedResponses, nil)
454

455 456
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
457 458 459 460 461 462 463
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
464
	requestManager := New(ctx, fal)
465 466 467 468 469 470 471
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

472 473 474 475 476
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)

	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
477 478 479 480

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	// async loaded response responds immediately
481
	fal.successResponseOn(rr.gsr.ID(), blockChain.AllBlocks())
482

483
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
484 485 486

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
487
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
488 489 490
	}

	requestManager.ProcessResponses(peers[0], failedResponses, nil)
491
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
492 493 494 495 496 497 498 499

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
500
	requestManager := New(ctx, fal)
501 502 503 504 505 506 507
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

508 509 510 511
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
512 513 514 515

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	// async loaded response responds immediately
516
	fal.successResponseOn(rr.gsr.ID(), blockChain.AllBlocks())
517

518
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
519

520
	md := encodedMetadataForBlocks(t, blockChain.AllBlocks(), true)
521
	firstResponses := []gsmsg.GraphSyncResponse{
522
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
523
	}
524
	requestManager.ProcessResponses(peers[0], firstResponses, blockChain.AllBlocks())
525 526

	fal.verifyNoRemainingData(t, rr.gsr.ID())
527
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
528 529 530 531 532 533 534
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
535
	requestManager := New(ctx, fal)
536 537 538 539 540 541 542
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

543 544 545 546
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
547 548 549

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

550
	md := encodedMetadataForBlocks(t, blockChain.AllBlocks(), false)
551
	firstResponses := []gsmsg.GraphSyncResponse{
552
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
553 554
	}
	requestManager.ProcessResponses(peers[0], firstResponses, nil)
555
	for _, block := range blockChain.AllBlocks() {
556 557
		fal.responseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
	}
558 559
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
560 561
	if len(errs) == 0 {
		t.Fatal("did not send  errors")
562 563
	}

564
}
565 566 567 568 569 570

func TestEncodingExtensions(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
571
	requestManager := New(ctx, fal)
572 573 574 575 576 577 578
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

579 580 581
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
582 583 584 585 586 587 588 589 590 591 592 593 594

	extensionData1 := testutil.RandomBytes(100)
	extensionName1 := graphsync.ExtensionName("AppleSauce/McGee")
	extension1 := graphsync.ExtensionData{
		Name: extensionName1,
		Data: extensionData1,
	}
	extensionData2 := testutil.RandomBytes(100)
	extensionName2 := graphsync.ExtensionName("HappyLand/Happenstance")
	extension2 := graphsync.ExtensionData{
		Name: extensionName2,
		Data: extensionData2,
	}
595 596 597 598 599 600 601 602 603 604 605 606

	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData) error {
		data, has := responseData.Extension(extensionName1)
		if !has {
			t.Fatal("Did not receive extension data in response")
		}
		receivedExtensionData <- data
		return <-expectedError
	}
	requestManager.RegisterHook(hook)
607
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector(), extension1, extension2)
608 609 610 611 612 613 614 615 616 617 618 619 620 621

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	gsr := rr.gsr
	returnedData1, found := gsr.Extension(extensionName1)
	if !found || !reflect.DeepEqual(extensionData1, returnedData1) {
		t.Fatal("Failed to encode first extension")
	}

	returnedData2, found := gsr.Extension(extensionName2)
	if !found || !reflect.DeepEqual(extensionData2, returnedData2) {
		t.Fatal("Failed to encode first extension")
	}

622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
		requestManager.ProcessResponses(peers[0], firstResponses, nil)
		select {
		case <-requestCtx.Done():
			t.Fatal("Should have checked extension but didn't")
		case received := <-receivedExtensionData:
			if !reflect.DeepEqual(received, expectedData) {
				t.Fatal("Did not receive correct extension data from resposne")
			}
		}
		nextExpectedData := testutil.RandomBytes(100)

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
		requestManager.ProcessResponses(peers[0], secondResponses, nil)
		select {
		case <-requestCtx.Done():
			t.Fatal("Should have checked extension but didn't")
		case received := <-receivedExtensionData:
			if !reflect.DeepEqual(received, nextExpectedData) {
				t.Fatal("Did not receive correct extension data from resposne")
			}
		}
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
673
}