requestmanager_test.go 22.3 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6 7
	"fmt"
	"sync"
8 9 10
	"testing"
	"time"

11
	"github.com/ipfs/go-graphsync"
12
	"github.com/ipfs/go-graphsync/requestmanager/types"
13
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
14
	"github.com/stretchr/testify/require"
15 16 17

	"github.com/ipfs/go-graphsync/metadata"

18
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
19 20 21

	"github.com/ipld/go-ipld-prime"

22 23 24 25 26 27
	blocks "github.com/ipfs/go-block-format"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
28 29
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
30
}
31

32 33 34 35
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

36 37
func (fph *fakePeerHandler) SendRequest(p peer.ID,
	graphSyncRequest gsmsg.GraphSyncRequest) {
38
	fph.requestRecordChan <- requestRecord{
39 40
		gsr: graphSyncRequest,
		p:   p,
41 42 43
	}
}

44
type requestKey struct {
45
	requestID graphsync.RequestID
46 47 48 49 50 51
	link      ipld.Link
}

type fakeAsyncLoader struct {
	responseChannelsLk sync.RWMutex
	responseChannels   map[requestKey]chan types.AsyncLoadResult
52
	responses          chan map[graphsync.RequestID]metadata.Metadata
53 54 55 56 57 58
	blks               chan []blocks.Block
}

func newFakeAsyncLoader() *fakeAsyncLoader {
	return &fakeAsyncLoader{
		responseChannels: make(map[requestKey]chan types.AsyncLoadResult),
59
		responses:        make(chan map[graphsync.RequestID]metadata.Metadata, 1),
60 61 62
		blks:             make(chan []blocks.Block, 1),
	}
}
63
func (fal *fakeAsyncLoader) StartRequest(requestID graphsync.RequestID) {
64
}
65
func (fal *fakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
66 67 68 69 70
	blks []blocks.Block) {
	fal.responses <- responses
	fal.blks <- blks
}
func (fal *fakeAsyncLoader) verifyLastProcessedBlocks(ctx context.Context, t *testing.T, expectedBlocks []blocks.Block) {
Hannah Howard's avatar
Hannah Howard committed
71 72 73
	var processedBlocks []blocks.Block
	testutil.AssertReceive(ctx, t, fal.blks, &processedBlocks, "did not process blocks")
	require.Equal(t, expectedBlocks, processedBlocks, "did not process correct blocks")
74
}
Hannah Howard's avatar
Hannah Howard committed
75

76
func (fal *fakeAsyncLoader) verifyLastProcessedResponses(ctx context.Context, t *testing.T,
77
	expectedResponses map[graphsync.RequestID]metadata.Metadata) {
Hannah Howard's avatar
Hannah Howard committed
78 79 80
	var responses map[graphsync.RequestID]metadata.Metadata
	testutil.AssertReceive(ctx, t, fal.responses, &responses, "did not process responses")
	require.Equal(t, expectedResponses, responses, "did not process correct responses")
81 82
}

83
func (fal *fakeAsyncLoader) verifyNoRemainingData(t *testing.T, requestID graphsync.RequestID) {
84 85
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
Hannah Howard's avatar
Hannah Howard committed
86
		require.NotEqual(t, key.requestID, requestID, "did not clean up request properly")
87 88 89 90
	}
	fal.responseChannelsLk.Unlock()
}

91
func (fal *fakeAsyncLoader) asyncLoad(requestID graphsync.RequestID, link ipld.Link) chan types.AsyncLoadResult {
92 93 94 95 96 97 98 99 100 101
	fal.responseChannelsLk.Lock()
	responseChannel, ok := fal.responseChannels[requestKey{requestID, link}]
	if !ok {
		responseChannel = make(chan types.AsyncLoadResult, 1)
		fal.responseChannels[requestKey{requestID, link}] = responseChannel
	}
	fal.responseChannelsLk.Unlock()
	return responseChannel
}

102
func (fal *fakeAsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
103 104
	return fal.asyncLoad(requestID, link)
}
105 106
func (fal *fakeAsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {}
func (fal *fakeAsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
107 108 109 110 111 112 113 114 115
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
		if key.requestID == requestID {
			delete(fal.responseChannels, key)
		}
	}
	fal.responseChannelsLk.Unlock()
}

116
func (fal *fakeAsyncLoader) responseOn(requestID graphsync.RequestID, link ipld.Link, result types.AsyncLoadResult) {
117 118 119 120 121
	responseChannel := fal.asyncLoad(requestID, link)
	responseChannel <- result
	close(responseChannel)
}

122
func (fal *fakeAsyncLoader) successResponseOn(requestID graphsync.RequestID, blks []blocks.Block) {
123 124 125 126 127
	for _, block := range blks {
		fal.responseOn(requestID, cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: block.RawData(), Err: nil})
	}
}

128 129 130 131 132 133
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
Hannah Howard's avatar
Hannah Howard committed
134 135 136
		var rr requestRecord
		testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
		requestRecords = append(requestRecords, rr)
137 138 139 140
	}
	return requestRecords
}

141 142 143 144 145 146 147 148 149 150 151
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
			Link:         cidlink.Link{Cid: block.Cid()},
			BlockPresent: present,
		})
	}
	return md
}

152
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
153
	md := metadataForBlocks(blks, present)
154
	metadataEncoded, err := metadata.EncodeMetadata(md)
Hannah Howard's avatar
Hannah Howard committed
155
	require.NoError(t, err, "did not encode metadata")
156 157
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
158 159
		Data: metadataEncoded,
	}
160 161
}

162 163 164 165
func TestNormalSimultaneousFetch(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
166
	fal := newFakeAsyncLoader()
167
	requestManager := New(ctx, fal)
168 169 170 171 172
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
173
	peers := testutil.GeneratePeers(1)
174

175
	blockStore := make(map[ipld.Link][]byte)
176
	loader, storer := testutil.NewTestStore(blockStore)
177 178
	blockChain1 := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	blockChain2 := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
179

180 181
	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx, peers[0], blockChain1.TipLink, blockChain1.Selector())
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
182

183
	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
184

Hannah Howard's avatar
Hannah Howard committed
185 186 187 188 189 190
	require.Equal(t, peers[0], requestRecords[0].p)
	require.Equal(t, peers[0], requestRecords[1].p)
	require.False(t, requestRecords[0].gsr.IsCancel())
	require.False(t, requestRecords[1].gsr.IsCancel())
	require.Equal(t, maxPriority, requestRecords[0].gsr.Priority())
	require.Equal(t, maxPriority, requestRecords[1].gsr.Priority())
191

Hannah Howard's avatar
Hannah Howard committed
192 193
	require.Equal(t, blockChain1.Selector(), requestRecords[0].gsr.Selector(), "did not encode selector properly")
	require.Equal(t, blockChain2.Selector(), requestRecords[1].gsr.Selector(), "did not encode selector properly")
194

195 196
	firstBlocks := append(blockChain1.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(blockChain1.AllBlocks(), true)
197
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
Hannah Howard's avatar
Hannah Howard committed
198
	require.NoError(t, err, "did not encode metadata")
199
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
200
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
Hannah Howard's avatar
Hannah Howard committed
201
	require.NoError(t, err, "did not encode metadata")
202
	firstResponses := []gsmsg.GraphSyncResponse{
203 204
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
205 206
			Data: firstMetadataEncoded1,
		}),
207 208
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
209 210
			Data: firstMetadataEncoded2,
		}),
211 212
	}

213 214
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	fal.verifyLastProcessedBlocks(ctx, t, firstBlocks)
215
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
216 217 218
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
219 220
	fal.successResponseOn(requestRecords[0].gsr.ID(), blockChain1.AllBlocks())
	fal.successResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
221

222 223
	blockChain1.VerifyWholeChain(requestCtx, returnedResponseChan1)
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
224

225
	moreBlocks := blockChain2.RemainderBlocks(3)
226
	moreMetadata := metadataForBlocks(moreBlocks, true)
227
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
Hannah Howard's avatar
Hannah Howard committed
228
	require.NoError(t, err, "did not encode metadata")
229
	moreResponses := []gsmsg.GraphSyncResponse{
230 231
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
232 233
			Data: moreMetadataEncoded,
		}),
234 235
	}

236 237
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.verifyLastProcessedBlocks(ctx, t, moreBlocks)
238
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
239 240
		requestRecords[1].gsr.ID(): moreMetadata,
	})
241

242 243
	fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)

244
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
245 246
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
247 248 249 250 251 252
}

func TestCancelRequestInProgress(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
253
	fal := newFakeAsyncLoader()
254
	requestManager := New(ctx, fal)
255 256 257 258 259 260 261
	requestManager.SetDelegate(fph)
	requestManager.Startup()
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
262
	peers := testutil.GeneratePeers(1)
263

264
	blockStore := make(map[ipld.Link][]byte)
265
	loader, storer := testutil.NewTestStore(blockStore)
266
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
267

268 269
	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx1, peers[0], blockChain.TipLink, blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx2, peers[0], blockChain.TipLink, blockChain.Selector())
270

271
	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
272

273 274
	firstBlocks := blockChain.Blocks(0, 3)
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
275
	firstResponses := []gsmsg.GraphSyncResponse{
276 277
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
278 279
	}

280
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
281

282 283 284
	fal.successResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	fal.successResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
	blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
285
	cancel1()
286
	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
Hannah Howard's avatar
Hannah Howard committed
287 288 289

	require.True(t, rr.gsr.IsCancel())
	require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
290

291 292
	moreBlocks := blockChain.RemainderBlocks(3)
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
293
	moreResponses := []gsmsg.GraphSyncResponse{
294 295
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
296
	}
297
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
298 299
	fal.successResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
300

301 302
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
303 304
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
305 306 307 308 309 310 311
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
312
	fal := newFakeAsyncLoader()
313
	requestManager := New(managerCtx, fal)
314 315 316 317
	requestManager.SetDelegate(fph)
	requestManager.Startup()
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
318
	peers := testutil.GeneratePeers(1)
319

320
	blockStore := make(map[ipld.Link][]byte)
321
	loader, storer := testutil.NewTestStore(blockStore)
322
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
323

324
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
325

326
	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
327

328
	firstBlocks := blockChain.Blocks(0, 3)
329
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
330
	firstResponses := []gsmsg.GraphSyncResponse{
331
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
332
	}
333 334
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	fal.successResponseOn(rr.gsr.ID(), firstBlocks)
335
	blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
336 337
	managerCancel()

338
	moreBlocks := blockChain.RemainderBlocks(3)
339
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
340
	moreResponses := []gsmsg.GraphSyncResponse{
341
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
342
	}
343 344
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.successResponseOn(rr.gsr.ID(), moreBlocks)
345
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
346
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
347 348 349 350 351 352
}

func TestUnencodableSelector(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
353
	fal := newFakeAsyncLoader()
354
	requestManager := New(ctx, fal)
355 356 357 358 359 360 361
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

362 363
	s := testutil.NewUnencodableSelectorSpec()
	r := cidlink.Link{Cid: testutil.GenerateCids(1)[0]}
364
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
365

366 367
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
368
}
369 370 371 372 373

func TestFailedRequest(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
374
	fal := newFakeAsyncLoader()
375
	requestManager := New(ctx, fal)
376 377 378 379 380
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
381
	peers := testutil.GeneratePeers(1)
382

383
	blockStore := make(map[ipld.Link][]byte)
384
	loader, storer := testutil.NewTestStore(blockStore)
385 386 387
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)

	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
388 389

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
390
	failedResponses := []gsmsg.GraphSyncResponse{
391
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
392
	}
393
	requestManager.ProcessResponses(peers[0], failedResponses, nil)
394

395 396
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
397 398 399 400 401 402 403
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
404
	requestManager := New(ctx, fal)
405 406 407 408 409 410 411
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

412
	blockStore := make(map[ipld.Link][]byte)
413
	loader, storer := testutil.NewTestStore(blockStore)
414 415 416
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)

	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
417 418 419 420

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	// async loaded response responds immediately
421
	fal.successResponseOn(rr.gsr.ID(), blockChain.AllBlocks())
422

423
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
424 425 426

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
427
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
428 429 430
	}

	requestManager.ProcessResponses(peers[0], failedResponses, nil)
431
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
432 433 434 435 436 437 438 439

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
440
	requestManager := New(ctx, fal)
441 442 443 444 445 446 447
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

448
	blockStore := make(map[ipld.Link][]byte)
449
	loader, storer := testutil.NewTestStore(blockStore)
450 451
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
452 453 454 455

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	// async loaded response responds immediately
456
	fal.successResponseOn(rr.gsr.ID(), blockChain.AllBlocks())
457

458
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
459

460
	md := encodedMetadataForBlocks(t, blockChain.AllBlocks(), true)
461
	firstResponses := []gsmsg.GraphSyncResponse{
462
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
463
	}
464
	requestManager.ProcessResponses(peers[0], firstResponses, blockChain.AllBlocks())
465 466

	fal.verifyNoRemainingData(t, rr.gsr.ID())
467
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
468 469 470 471 472 473 474
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
475
	requestManager := New(ctx, fal)
476 477 478 479 480 481 482
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

483
	blockStore := make(map[ipld.Link][]byte)
484
	loader, storer := testutil.NewTestStore(blockStore)
485 486
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
487 488 489

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

490
	md := encodedMetadataForBlocks(t, blockChain.AllBlocks(), false)
491
	firstResponses := []gsmsg.GraphSyncResponse{
492
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
493 494
	}
	requestManager.ProcessResponses(peers[0], firstResponses, nil)
495
	for _, block := range blockChain.AllBlocks() {
496 497
		fal.responseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
	}
498 499
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
500
	require.NotEqual(t, len(errs), 0, "did not send errors")
501
}
502 503 504 505 506 507

func TestEncodingExtensions(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
508
	requestManager := New(ctx, fal)
509 510 511 512 513 514 515
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

516
	blockStore := make(map[ipld.Link][]byte)
517
	loader, storer := testutil.NewTestStore(blockStore)
518
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
519 520 521 522 523 524 525 526 527 528 529 530 531

	extensionData1 := testutil.RandomBytes(100)
	extensionName1 := graphsync.ExtensionName("AppleSauce/McGee")
	extension1 := graphsync.ExtensionData{
		Name: extensionName1,
		Data: extensionData1,
	}
	extensionData2 := testutil.RandomBytes(100)
	extensionName2 := graphsync.ExtensionName("HappyLand/Happenstance")
	extension2 := graphsync.ExtensionData{
		Name: extensionName2,
		Data: extensionData2,
	}
532 533 534 535 536

	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData) error {
		data, has := responseData.Extension(extensionName1)
Hannah Howard's avatar
Hannah Howard committed
537
		require.True(t, has, "did not receive extension data in response")
538 539 540 541
		receivedExtensionData <- data
		return <-expectedError
	}
	requestManager.RegisterHook(hook)
542
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector(), extension1, extension2)
543 544 545 546 547

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	gsr := rr.gsr
	returnedData1, found := gsr.Extension(extensionName1)
Hannah Howard's avatar
Hannah Howard committed
548 549
	require.True(t, found)
	require.Equal(t, extensionData1, returnedData1, "did not encode first extension correctly")
550 551

	returnedData2, found := gsr.Extension(extensionName2)
Hannah Howard's avatar
Hannah Howard committed
552 553
	require.True(t, found)
	require.Equal(t, extensionData2, returnedData2, "did not encode second extension correctly")
554

555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
		requestManager.ProcessResponses(peers[0], firstResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
571 572 573
		var received []byte
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, expectedData, received, "did not receive correct extension data from resposne")
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
		nextExpectedData := testutil.RandomBytes(100)

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
		requestManager.ProcessResponses(peers[0], secondResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
590 591
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne")
592 593 594
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
595
}