requestmanager_test.go 24.2 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
7
	"reflect"
8
	"sync"
9 10 11
	"testing"
	"time"

12
	"github.com/ipfs/go-graphsync"
13
	"github.com/ipfs/go-graphsync/requestmanager/types"
14
	"github.com/libp2p/go-libp2p-core/peer"
15 16 17

	"github.com/ipfs/go-graphsync/metadata"

18
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
19 20 21

	"github.com/ipld/go-ipld-prime"

22
	blocks "github.com/ipfs/go-block-format"
23
	cid "github.com/ipfs/go-cid"
24 25 26 27 28 29
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testbridge"
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
30 31
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
32
}
33

34 35 36 37
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

38 39
func (fph *fakePeerHandler) SendRequest(p peer.ID,
	graphSyncRequest gsmsg.GraphSyncRequest) {
40
	fph.requestRecordChan <- requestRecord{
41 42
		gsr: graphSyncRequest,
		p:   p,
43 44 45
	}
}

46
type requestKey struct {
47
	requestID graphsync.RequestID
48 49 50 51 52 53
	link      ipld.Link
}

type fakeAsyncLoader struct {
	responseChannelsLk sync.RWMutex
	responseChannels   map[requestKey]chan types.AsyncLoadResult
54
	responses          chan map[graphsync.RequestID]metadata.Metadata
55 56 57 58 59 60
	blks               chan []blocks.Block
}

func newFakeAsyncLoader() *fakeAsyncLoader {
	return &fakeAsyncLoader{
		responseChannels: make(map[requestKey]chan types.AsyncLoadResult),
61
		responses:        make(chan map[graphsync.RequestID]metadata.Metadata, 1),
62 63 64
		blks:             make(chan []blocks.Block, 1),
	}
}
65
func (fal *fakeAsyncLoader) StartRequest(requestID graphsync.RequestID) {
66
}
67
func (fal *fakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
	blks []blocks.Block) {
	fal.responses <- responses
	fal.blks <- blks
}
func (fal *fakeAsyncLoader) verifyLastProcessedBlocks(ctx context.Context, t *testing.T, expectedBlocks []blocks.Block) {
	select {
	case <-ctx.Done():
		t.Fatal("should have processed blocks but didn't")
	case processedBlocks := <-fal.blks:
		if !reflect.DeepEqual(processedBlocks, expectedBlocks) {
			t.Fatal("Did not process correct blocks")
		}
	}
}
func (fal *fakeAsyncLoader) verifyLastProcessedResponses(ctx context.Context, t *testing.T,
83
	expectedResponses map[graphsync.RequestID]metadata.Metadata) {
84 85 86 87 88 89 90 91 92 93
	select {
	case <-ctx.Done():
		t.Fatal("should have processed responses but didn't")
	case responses := <-fal.responses:
		if !reflect.DeepEqual(responses, expectedResponses) {
			t.Fatal("Did not send proper metadata")
		}
	}
}

94
func (fal *fakeAsyncLoader) verifyNoRemainingData(t *testing.T, requestID graphsync.RequestID) {
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
		if key.requestID == requestID {
			t.Fatal("request not properly cleaned up")
		}
	}
	fal.responseChannelsLk.Unlock()
}

func cidsForBlocks(blks []blocks.Block) []cid.Cid {
	cids := make([]cid.Cid, 0, 5)
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	return cids
}

112
func (fal *fakeAsyncLoader) asyncLoad(requestID graphsync.RequestID, link ipld.Link) chan types.AsyncLoadResult {
113 114 115 116 117 118 119 120 121 122
	fal.responseChannelsLk.Lock()
	responseChannel, ok := fal.responseChannels[requestKey{requestID, link}]
	if !ok {
		responseChannel = make(chan types.AsyncLoadResult, 1)
		fal.responseChannels[requestKey{requestID, link}] = responseChannel
	}
	fal.responseChannelsLk.Unlock()
	return responseChannel
}

123
func (fal *fakeAsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
124 125
	return fal.asyncLoad(requestID, link)
}
126 127
func (fal *fakeAsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {}
func (fal *fakeAsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
128 129 130 131 132 133 134 135 136
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
		if key.requestID == requestID {
			delete(fal.responseChannels, key)
		}
	}
	fal.responseChannelsLk.Unlock()
}

137
func (fal *fakeAsyncLoader) responseOn(requestID graphsync.RequestID, link ipld.Link, result types.AsyncLoadResult) {
138 139 140 141 142
	responseChannel := fal.asyncLoad(requestID, link)
	responseChannel <- result
	close(responseChannel)
}

143
func (fal *fakeAsyncLoader) successResponseOn(requestID graphsync.RequestID, blks []blocks.Block) {
144 145 146 147 148
	for _, block := range blks {
		fal.responseOn(requestID, cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: block.RawData(), Err: nil})
	}
}

149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
		select {
		case rr := <-requestRecordChan:
			requestRecords = append(requestRecords, rr)
		case <-ctx.Done():
			t.Fatal("should have sent two requests to the network but did not")
		}
	}
	return requestRecords
}

165 166 167 168 169 170 171 172 173 174 175
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
			Link:         cidlink.Link{Cid: block.Cid()},
			BlockPresent: present,
		})
	}
	return md
}

176
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
177
	md := metadataForBlocks(blks, present)
178
	metadataEncoded, err := metadata.EncodeMetadata(md)
179 180 181
	if err != nil {
		t.Fatal("did not encode metadata")
	}
182 183
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
184 185
		Data: metadataEncoded,
	}
186 187
}

188 189 190 191 192
func TestNormalSimultaneousFetch(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
193 194
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
195 196 197 198 199
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
200
	peers := testutil.GeneratePeers(1)
201

202 203 204 205
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain1 := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	blockChain2 := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
206

207 208
	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx, peers[0], blockChain1.TipLink, blockChain1.Selector())
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
209

210
	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
211

212
	if requestRecords[0].p != peers[0] || requestRecords[1].p != peers[0] ||
213 214 215
		requestRecords[0].gsr.IsCancel() != false || requestRecords[1].gsr.IsCancel() != false ||
		requestRecords[0].gsr.Priority() != maxPriority ||
		requestRecords[1].gsr.Priority() != maxPriority {
216 217 218
		t.Fatal("did not send correct requests")
	}

219
	if !reflect.DeepEqual(blockChain1.Selector(), requestRecords[0].gsr.Selector()) {
220 221
		t.Fatal("did not encode selector properly")
	}
222
	if !reflect.DeepEqual(blockChain2.Selector(), requestRecords[1].gsr.Selector()) {
223 224 225
		t.Fatal("did not encode selector properly")
	}

226 227
	firstBlocks := append(blockChain1.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(blockChain1.AllBlocks(), true)
228
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
229 230 231
	if err != nil {
		t.Fatal("did not encode metadata")
	}
232
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
233
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
234 235 236
	if err != nil {
		t.Fatal("did not encode metadata")
	}
237
	firstResponses := []gsmsg.GraphSyncResponse{
238 239
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
240 241
			Data: firstMetadataEncoded1,
		}),
242 243
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
244 245
			Data: firstMetadataEncoded2,
		}),
246 247
	}

248 249
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	fal.verifyLastProcessedBlocks(ctx, t, firstBlocks)
250
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
251 252 253
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
254 255
	fal.successResponseOn(requestRecords[0].gsr.ID(), blockChain1.AllBlocks())
	fal.successResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
256

257 258
	blockChain1.VerifyWholeChain(requestCtx, returnedResponseChan1)
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
259

260
	moreBlocks := blockChain2.RemainderBlocks(3)
261
	moreMetadata := metadataForBlocks(moreBlocks, true)
262
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
263 264 265
	if err != nil {
		t.Fatal("did not encode metadata")
	}
266
	moreResponses := []gsmsg.GraphSyncResponse{
267 268
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
269 270
			Data: moreMetadataEncoded,
		}),
271 272
	}

273 274
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.verifyLastProcessedBlocks(ctx, t, moreBlocks)
275
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
276 277
		requestRecords[1].gsr.ID(): moreMetadata,
	})
278

279 280
	fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)

281
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
282 283
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
284 285 286 287 288 289 290
}

func TestCancelRequestInProgress(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
291 292
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
293 294 295 296 297 298 299
	requestManager.SetDelegate(fph)
	requestManager.Startup()
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
300
	peers := testutil.GeneratePeers(1)
301

302 303 304
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
305

306 307
	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx1, peers[0], blockChain.TipLink, blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx2, peers[0], blockChain.TipLink, blockChain.Selector())
308

309
	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
310

311 312
	firstBlocks := blockChain.Blocks(0, 3)
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
313
	firstResponses := []gsmsg.GraphSyncResponse{
314 315
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
316 317
	}

318
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
319

320 321 322
	fal.successResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	fal.successResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
	blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
323
	cancel1()
324
	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
325
	if rr.gsr.IsCancel() != true || rr.gsr.ID() != requestRecords[0].gsr.ID() {
326
		t.Fatal("did not send correct cancel message over network")
327 328
	}

329 330
	moreBlocks := blockChain.RemainderBlocks(3)
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
331
	moreResponses := []gsmsg.GraphSyncResponse{
332 333
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
334
	}
335
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
336 337
	fal.successResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
338

339 340
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
341 342
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
343 344 345 346 347 348 349 350
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
351 352
	fal := newFakeAsyncLoader()
	requestManager := New(managerCtx, fal, fakeIPLDBridge)
353 354 355 356
	requestManager.SetDelegate(fph)
	requestManager.Startup()
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
357
	peers := testutil.GeneratePeers(1)
358

359 360 361
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
362

363
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
364

365
	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
366

367
	firstBlocks := blockChain.Blocks(0, 3)
368
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
369
	firstResponses := []gsmsg.GraphSyncResponse{
370
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
371
	}
372 373
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	fal.successResponseOn(rr.gsr.ID(), firstBlocks)
374
	blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
375 376
	managerCancel()

377
	moreBlocks := blockChain.RemainderBlocks(3)
378
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
379
	moreResponses := []gsmsg.GraphSyncResponse{
380
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
381
	}
382 383
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.successResponseOn(rr.gsr.ID(), moreBlocks)
384
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
385
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
386 387
}

388
/*func TestInvalidSelector(t *testing.T) {
389 390 391 392
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
393 394
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
395 396 397 398 399 400 401
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

402 403 404 405
	cids := testutil.GenerateCids(5)
	s := testbridge.NewUnencodableSelectorSpec(cids)
	r := cidlink.Link{Cid: cids[0]}
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
406

407 408
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
409
}*/
410 411 412 413 414 415

func TestUnencodableSelector(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
416 417
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
418 419 420 421 422 423 424
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

425 426 427 428
	cids := testutil.GenerateCids(5)
	s := testbridge.NewUnencodableSelectorSpec(cids)
	r := cidlink.Link{Cid: cids[0]}
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
429

430 431
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
432
}
433 434 435 436 437 438

func TestFailedRequest(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
439 440
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
441 442 443 444 445
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
446
	peers := testutil.GeneratePeers(1)
447

448 449 450 451 452
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)

	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
453 454

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
455
	failedResponses := []gsmsg.GraphSyncResponse{
456
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
457
	}
458
	requestManager.ProcessResponses(peers[0], failedResponses, nil)
459

460 461
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

478 479 480 481 482
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)

	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
483 484 485 486

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	// async loaded response responds immediately
487
	fal.successResponseOn(rr.gsr.ID(), blockChain.AllBlocks())
488

489
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
490 491 492

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
493
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
494 495 496
	}

	requestManager.ProcessResponses(peers[0], failedResponses, nil)
497
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

515 516 517 518
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
519 520 521 522

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	// async loaded response responds immediately
523
	fal.successResponseOn(rr.gsr.ID(), blockChain.AllBlocks())
524

525
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
526

527
	md := encodedMetadataForBlocks(t, blockChain.AllBlocks(), true)
528
	firstResponses := []gsmsg.GraphSyncResponse{
529
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
530
	}
531
	requestManager.ProcessResponses(peers[0], firstResponses, blockChain.AllBlocks())
532 533

	fal.verifyNoRemainingData(t, rr.gsr.ID())
534
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

551 552 553 554
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
555 556 557

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

558
	md := encodedMetadataForBlocks(t, blockChain.AllBlocks(), false)
559
	firstResponses := []gsmsg.GraphSyncResponse{
560
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
561 562
	}
	requestManager.ProcessResponses(peers[0], firstResponses, nil)
563
	for _, block := range blockChain.AllBlocks() {
564 565
		fal.responseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
	}
566 567
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
568 569
	if len(errs) == 0 {
		t.Fatal("did not send  errors")
570 571
	}

572
}
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587

func TestEncodingExtensions(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	fakeIPLDBridge := testbridge.NewMockIPLDBridge()
	ctx := context.Background()
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal, fakeIPLDBridge)
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

588 589 590
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
591 592 593 594 595 596 597 598 599 600 601 602 603

	extensionData1 := testutil.RandomBytes(100)
	extensionName1 := graphsync.ExtensionName("AppleSauce/McGee")
	extension1 := graphsync.ExtensionData{
		Name: extensionName1,
		Data: extensionData1,
	}
	extensionData2 := testutil.RandomBytes(100)
	extensionName2 := graphsync.ExtensionName("HappyLand/Happenstance")
	extension2 := graphsync.ExtensionData{
		Name: extensionName2,
		Data: extensionData2,
	}
604 605 606 607 608 609 610 611 612 613 614 615

	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData) error {
		data, has := responseData.Extension(extensionName1)
		if !has {
			t.Fatal("Did not receive extension data in response")
		}
		receivedExtensionData <- data
		return <-expectedError
	}
	requestManager.RegisterHook(hook)
616
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector(), extension1, extension2)
617 618 619 620 621 622 623 624 625 626 627 628 629 630

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	gsr := rr.gsr
	returnedData1, found := gsr.Extension(extensionName1)
	if !found || !reflect.DeepEqual(extensionData1, returnedData1) {
		t.Fatal("Failed to encode first extension")
	}

	returnedData2, found := gsr.Extension(extensionName2)
	if !found || !reflect.DeepEqual(extensionData2, returnedData2) {
		t.Fatal("Failed to encode first extension")
	}

631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
		requestManager.ProcessResponses(peers[0], firstResponses, nil)
		select {
		case <-requestCtx.Done():
			t.Fatal("Should have checked extension but didn't")
		case received := <-receivedExtensionData:
			if !reflect.DeepEqual(received, expectedData) {
				t.Fatal("Did not receive correct extension data from resposne")
			}
		}
		nextExpectedData := testutil.RandomBytes(100)

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
		requestManager.ProcessResponses(peers[0], secondResponses, nil)
		select {
		case <-requestCtx.Done():
			t.Fatal("Should have checked extension but didn't")
		case received := <-receivedExtensionData:
			if !reflect.DeepEqual(received, nextExpectedData) {
				t.Fatal("Did not receive correct extension data from resposne")
			}
		}
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
682
}