requestmanager_test.go 34.9 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
7 8 9
	"testing"
	"time"

10 11 12
	"github.com/ipfs/go-graphsync/cidset"
	"github.com/ipfs/go-graphsync/requestmanager/testloader"

13
	"github.com/ipfs/go-graphsync"
Hannah Howard's avatar
Hannah Howard committed
14
	"github.com/ipfs/go-graphsync/requestmanager/hooks"
15
	"github.com/ipfs/go-graphsync/requestmanager/types"
16
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
17
	"github.com/stretchr/testify/require"
18 19 20

	"github.com/ipfs/go-graphsync/metadata"

21
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
22 23 24

	"github.com/ipld/go-ipld-prime"

25 26 27 28 29 30
	blocks "github.com/ipfs/go-block-format"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
31 32
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
33
}
34

35 36 37 38
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

39 40
func (fph *fakePeerHandler) SendRequest(p peer.ID,
	graphSyncRequest gsmsg.GraphSyncRequest) {
41
	fph.requestRecordChan <- requestRecord{
42 43
		gsr: graphSyncRequest,
		p:   p,
44 45 46
	}
}

47 48 49 50 51 52
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
Hannah Howard's avatar
Hannah Howard committed
53 54 55
		var rr requestRecord
		testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
		requestRecords = append(requestRecords, rr)
56 57 58 59
	}
	return requestRecords
}

60 61 62 63 64 65 66 67 68 69 70
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
			Link:         cidlink.Link{Cid: block.Cid()},
			BlockPresent: present,
		})
	}
	return md
}

71
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
72
	md := metadataForBlocks(blks, present)
73
	metadataEncoded, err := metadata.EncodeMetadata(md)
Hannah Howard's avatar
Hannah Howard committed
74
	require.NoError(t, err, "did not encode metadata")
75 76
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
77 78
		Data: metadataEncoded,
	}
79 80
}

81 82
func TestNormalSimultaneousFetch(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
83
	td := newTestData(ctx, t)
84 85 86

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
87
	peers := testutil.GeneratePeers(1)
88

Hannah Howard's avatar
Hannah Howard committed
89
	blockChain2 := testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
90

Hannah Howard's avatar
Hannah Howard committed
91 92
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
93

Hannah Howard's avatar
Hannah Howard committed
94
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
95

Hannah Howard's avatar
Hannah Howard committed
96 97 98 99
	require.Equal(t, peers[0], requestRecords[0].p)
	require.Equal(t, peers[0], requestRecords[1].p)
	require.False(t, requestRecords[0].gsr.IsCancel())
	require.False(t, requestRecords[1].gsr.IsCancel())
100 101
	require.Equal(t, defaultPriority, requestRecords[0].gsr.Priority())
	require.Equal(t, defaultPriority, requestRecords[1].gsr.Priority())
102

Hannah Howard's avatar
Hannah Howard committed
103
	require.Equal(t, td.blockChain.Selector(), requestRecords[0].gsr.Selector(), "did not encode selector properly")
Hannah Howard's avatar
Hannah Howard committed
104
	require.Equal(t, blockChain2.Selector(), requestRecords[1].gsr.Selector(), "did not encode selector properly")
105

Hannah Howard's avatar
Hannah Howard committed
106 107
	firstBlocks := append(td.blockChain.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(td.blockChain.AllBlocks(), true)
108
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
Hannah Howard's avatar
Hannah Howard committed
109
	require.NoError(t, err, "did not encode metadata")
110
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
111
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
Hannah Howard's avatar
Hannah Howard committed
112
	require.NoError(t, err, "did not encode metadata")
113
	firstResponses := []gsmsg.GraphSyncResponse{
114 115
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
116 117
			Data: firstMetadataEncoded1,
		}),
118 119
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
120 121
			Data: firstMetadataEncoded2,
		}),
122 123
	}

Hannah Howard's avatar
Hannah Howard committed
124
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
125 126
	td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
127 128 129
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
130 131
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
132

Hannah Howard's avatar
Hannah Howard committed
133
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan1)
134
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
135

136
	moreBlocks := blockChain2.RemainderBlocks(3)
137
	moreMetadata := metadataForBlocks(moreBlocks, true)
138
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
Hannah Howard's avatar
Hannah Howard committed
139
	require.NoError(t, err, "did not encode metadata")
140
	moreResponses := []gsmsg.GraphSyncResponse{
141 142
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
143 144
			Data: moreMetadataEncoded,
		}),
145 146
	}

Hannah Howard's avatar
Hannah Howard committed
147
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
148 149
	td.fal.VerifyLastProcessedBlocks(ctx, t, moreBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
150 151
		requestRecords[1].gsr.ID(): moreMetadata,
	})
152

153
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
154

155
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
156 157
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
158 159 160 161
}

func TestCancelRequestInProgress(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
162
	td := newTestData(ctx, t)
163 164 165 166 167
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
168
	peers := testutil.GeneratePeers(1)
169

Hannah Howard's avatar
Hannah Howard committed
170 171
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
172

Hannah Howard's avatar
Hannah Howard committed
173
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
174

Hannah Howard's avatar
Hannah Howard committed
175
	firstBlocks := td.blockChain.Blocks(0, 3)
176
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
177
	firstResponses := []gsmsg.GraphSyncResponse{
178 179
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
180 181
	}

Hannah Howard's avatar
Hannah Howard committed
182
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
183

184 185
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
186
	td.blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
187
	cancel1()
Hannah Howard's avatar
Hannah Howard committed
188
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
Hannah Howard's avatar
Hannah Howard committed
189 190 191

	require.True(t, rr.gsr.IsCancel())
	require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
192

Hannah Howard's avatar
Hannah Howard committed
193
	moreBlocks := td.blockChain.RemainderBlocks(3)
194
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
195
	moreResponses := []gsmsg.GraphSyncResponse{
196 197
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
198
	}
Hannah Howard's avatar
Hannah Howard committed
199
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
200 201
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
202

203
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
Hannah Howard's avatar
Hannah Howard committed
204
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
205 206
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
207 208 209 210 211
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
Hannah Howard's avatar
Hannah Howard committed
212
	td := newTestData(managerCtx, t)
213 214
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
215
	peers := testutil.GeneratePeers(1)
216

Hannah Howard's avatar
Hannah Howard committed
217
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
218

Hannah Howard's avatar
Hannah Howard committed
219
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
220

Hannah Howard's avatar
Hannah Howard committed
221
	firstBlocks := td.blockChain.Blocks(0, 3)
222
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
223
	firstResponses := []gsmsg.GraphSyncResponse{
224
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
225
	}
Hannah Howard's avatar
Hannah Howard committed
226
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
227
	td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
228
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
229 230
	managerCancel()

Hannah Howard's avatar
Hannah Howard committed
231
	moreBlocks := td.blockChain.RemainderBlocks(3)
232
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
233
	moreResponses := []gsmsg.GraphSyncResponse{
234
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
235
	}
Hannah Howard's avatar
Hannah Howard committed
236
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
237
	td.fal.SuccessResponseOn(rr.gsr.ID(), moreBlocks)
238
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
239
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
240 241
}

242 243
func TestFailedRequest(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
244
	td := newTestData(ctx, t)
245 246
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
247
	peers := testutil.GeneratePeers(1)
248

Hannah Howard's avatar
Hannah Howard committed
249
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
250

Hannah Howard's avatar
Hannah Howard committed
251
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
252
	failedResponses := []gsmsg.GraphSyncResponse{
253
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
254
	}
Hannah Howard's avatar
Hannah Howard committed
255
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
256

257 258
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
259 260 261 262
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
263
	td := newTestData(ctx, t)
264 265 266 267 268

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
269
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
270

Hannah Howard's avatar
Hannah Howard committed
271
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
272 273

	// async loaded response responds immediately
274
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
275

Hannah Howard's avatar
Hannah Howard committed
276
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
277 278 279

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
280
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
281 282
	}

Hannah Howard's avatar
Hannah Howard committed
283
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
284
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
285 286 287 288 289

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
290
	td := newTestData(ctx, t)
291 292 293 294 295

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

296 297 298 299
	called := make(chan struct{})
	td.responseHooks.Register(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
		close(called)
	})
Hannah Howard's avatar
Hannah Howard committed
300
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
301

Hannah Howard's avatar
Hannah Howard committed
302
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
303 304

	// async loaded response responds immediately
305
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
306

Hannah Howard's avatar
Hannah Howard committed
307
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
308

Hannah Howard's avatar
Hannah Howard committed
309
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), true)
310
	firstResponses := []gsmsg.GraphSyncResponse{
311
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
312
	}
Hannah Howard's avatar
Hannah Howard committed
313
	td.requestManager.ProcessResponses(peers[0], firstResponses, td.blockChain.AllBlocks())
314

315
	td.fal.VerifyNoRemainingData(t, rr.gsr.ID())
316
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
317
	testutil.AssertDoesReceive(requestCtx, t, called, "response hooks called for response")
318 319 320 321
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
322
	td := newTestData(ctx, t)
323 324 325 326 327

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
328
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
329

Hannah Howard's avatar
Hannah Howard committed
330
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
331

Hannah Howard's avatar
Hannah Howard committed
332
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), false)
333
	firstResponses := []gsmsg.GraphSyncResponse{
334
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
335
	}
Hannah Howard's avatar
Hannah Howard committed
336 337
	td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
	for _, block := range td.blockChain.AllBlocks() {
338
		td.fal.ResponseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
339
	}
340 341
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
342
	require.NotEqual(t, len(errs), 0, "did not send errors")
343
}
344 345 346

func TestEncodingExtensions(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
347
	td := newTestData(ctx, t)
348 349 350 351 352

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

353 354
	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
Hannah Howard's avatar
Hannah Howard committed
355 356
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
357
		data, has := responseData.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
358
		require.True(t, has, "did not receive extension data in response")
359
		receivedExtensionData <- data
Hannah Howard's avatar
Hannah Howard committed
360 361 362 363 364 365 366 367
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
368
	}
Hannah Howard's avatar
Hannah Howard committed
369
	td.responseHooks.Register(hook)
370
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
371

Hannah Howard's avatar
Hannah Howard committed
372
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
373 374

	gsr := rr.gsr
375
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
376
	require.True(t, found)
377
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
378

379
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
380
	require.True(t, found)
381
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
382

383 384
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
385
		expectedUpdate := testutil.RandomBytes(100)
386 387 388 389 390 391 392
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
393
					Name: td.extensionName1,
394 395 396 397 398
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
Hannah Howard's avatar
Hannah Howard committed
399 400
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
401
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
402 403 404
				Data: expectedUpdate,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
405
		td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
406 407 408
		var received []byte
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, expectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
409

Hannah Howard's avatar
Hannah Howard committed
410
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
411
		receivedUpdateData, has := rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
412 413 414
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

415
		nextExpectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
416 417
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
418 419 420 421 422 423 424 425

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
426
					Name: td.extensionName1,
427 428 429 430 431
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
Hannah Howard's avatar
Hannah Howard committed
432 433
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
434
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
435 436 437
				Data: nextExpectedUpdate1,
			},
			{
438
				Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
439 440 441
				Data: nextExpectedUpdate2,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
442
		td.requestManager.ProcessResponses(peers[0], secondResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
443 444
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
445

Hannah Howard's avatar
Hannah Howard committed
446
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
447
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
448 449
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
450
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
451 452 453
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

454 455 456
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
457
}
458

Hannah Howard's avatar
Hannah Howard committed
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
func TestBlockHooks(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	receivedBlocks := make(chan graphsync.BlockData, 4)
	receivedResponses := make(chan graphsync.ResponseData, 4)
	expectedError := make(chan error, 4)
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 4)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		receivedBlocks <- blockData
		receivedResponses <- responseData
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
	}
	td.blockHooks.Register(hook)
484
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
Hannah Howard's avatar
Hannah Howard committed
485 486 487 488

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	gsr := rr.gsr
489
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
490
	require.True(t, found)
491
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
Hannah Howard's avatar
Hannah Howard committed
492

493
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
494
	require.True(t, found)
495
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
Hannah Howard's avatar
Hannah Howard committed
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511

	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		expectedUpdate := testutil.RandomBytes(100)

		firstBlocks := td.blockChain.Blocks(0, 3)
		firstMetadata := metadataForBlocks(firstBlocks, true)
		firstMetadataEncoded, err := metadata.EncodeMetadata(firstMetadata)
		require.NoError(t, err, "did not encode metadata")
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: firstMetadataEncoded,
				},
				graphsync.ExtensionData{
512
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
513 514 515 516 517 518 519 520 521 522
					Data: expectedData,
				},
			),
		}
		for i := range firstBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(firstBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
523
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
524 525 526 527 528 529 530 531
						Data: expectedUpdate,
					},
				}
			}
			expectedUpdateChan <- update
		}

		td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
532 533
		td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
534 535
			rr.gsr.ID(): firstMetadata,
		})
536
		td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
537 538

		ur := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
539
		receivedUpdateData, has := ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
540 541 542 543 544 545 546 547 548 549 550
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range firstBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, firstResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, firstResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, firstMetadataEncoded, metadata, "should receive correct metadata")
551
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
			require.Equal(t, expectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		nextExpectedData := testutil.RandomBytes(100)
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
		nextBlocks := td.blockChain.RemainderBlocks(3)
		nextMetadata := metadataForBlocks(nextBlocks, true)
		nextMetadataEncoded, err := metadata.EncodeMetadata(nextMetadata)
		require.NoError(t, err)
		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.RequestCompletedFull, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nextMetadataEncoded,
				},
				graphsync.ExtensionData{
573
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
574 575 576 577 578 579 580 581 582 583
					Data: nextExpectedData,
				},
			),
		}
		for i := range nextBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(nextBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
584
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
585 586 587
						Data: nextExpectedUpdate1,
					},
					{
588
						Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
589 590 591 592 593 594 595
						Data: nextExpectedUpdate2,
					},
				}
			}
			expectedUpdateChan <- update
		}
		td.requestManager.ProcessResponses(peers[0], secondResponses, nextBlocks)
596 597
		td.fal.VerifyLastProcessedBlocks(ctx, t, nextBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
598 599
			rr.gsr.ID(): nextMetadata,
		})
600
		td.fal.SuccessResponseOn(rr.gsr.ID(), nextBlocks)
Hannah Howard's avatar
Hannah Howard committed
601 602

		ur = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
603
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
604 605
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
606
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
607 608 609 610 611 612 613 614 615 616 617
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range nextBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, secondResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, secondResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, nextMetadataEncoded, metadata, "should receive correct metadata")
618
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
619 620 621 622 623 624 625 626 627 628 629 630
			require.Equal(t, nextExpectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
		td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
	})
}

631 632
func TestOutgoingRequestHooks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
633
	td := newTestData(ctx, t)
634 635 636 637 638 639

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	hook := func(p peer.ID, r graphsync.RequestData, ha graphsync.OutgoingRequestHookActions) {
640
		_, has := r.Extension(td.extensionName1)
641
		if has {
Hannah Howard's avatar
Hannah Howard committed
642
			ha.UseLinkTargetNodeStyleChooser(td.blockChain.Chooser)
643 644 645
			ha.UsePersistenceOption("chainstore")
		}
	}
Hannah Howard's avatar
Hannah Howard committed
646
	td.requestHooks.Register(hook)
647

648
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1)
Hannah Howard's avatar
Hannah Howard committed
649
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
650

Hannah Howard's avatar
Hannah Howard committed
651
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
652

Hannah Howard's avatar
Hannah Howard committed
653
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
654 655 656 657 658 659 660 661 662 663
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	mdExt := graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
		Data: mdEncoded,
	}
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
	}
Hannah Howard's avatar
Hannah Howard committed
664
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
665 666
	td.fal.VerifyLastProcessedBlocks(ctx, t, td.blockChain.AllBlocks())
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
667 668 669
		requestRecords[0].gsr.ID(): md,
		requestRecords[1].gsr.ID(): md,
	})
670 671
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), td.blockChain.AllBlocks())
672

Hannah Howard's avatar
Hannah Howard committed
673 674
	td.blockChain.VerifyWholeChainWithTypes(requestCtx, returnedResponseChan1)
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
675 676
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan2)
677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848
	td.fal.VerifyStoreUsed(t, requestRecords[0].gsr.ID(), "chainstore")
	td.fal.VerifyStoreUsed(t, requestRecords[1].gsr.ID(), "")
}

func TestPauseResume(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForResumeAttempt := make(chan struct{})
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt-1 {
			<-holdForResumeAttempt
		}
		if blocksReceived == pauseAt {
			hookActions.PauseRequest()
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// attempt to unpause while request is not paused (note: hook on second block will keep it from
	// reaching pause point)
	err = td.requestManager.UnpauseRequest(rr.gsr.ID())
	require.EqualError(t, err, "request is not paused")
	close(holdForResumeAttempt)
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
}
func TestPauseResumeExternal(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt {
			err := td.requestManager.PauseRequest(responseData.RequestID())
			require.NoError(t, err)
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
849 850 851 852 853
}

type testData struct {
	requestRecordChan chan requestRecord
	fph               *fakePeerHandler
854
	fal               *testloader.FakeAsyncLoader
Hannah Howard's avatar
Hannah Howard committed
855 856 857 858 859 860 861 862
	requestHooks      *hooks.OutgoingRequestHooks
	responseHooks     *hooks.IncomingResponseHooks
	blockHooks        *hooks.IncomingBlockHooks
	requestManager    *RequestManager
	blockStore        map[ipld.Link][]byte
	loader            ipld.Loader
	storer            ipld.Storer
	blockChain        *testutil.TestBlockChain
863 864 865 866 867 868
	extensionName1    graphsync.ExtensionName
	extensionData1    []byte
	extension1        graphsync.ExtensionData
	extensionName2    graphsync.ExtensionName
	extensionData2    []byte
	extension2        graphsync.ExtensionData
Hannah Howard's avatar
Hannah Howard committed
869 870 871 872 873 874
}

func newTestData(ctx context.Context, t *testing.T) *testData {
	td := &testData{}
	td.requestRecordChan = make(chan requestRecord, 3)
	td.fph = &fakePeerHandler{td.requestRecordChan}
875
	td.fal = testloader.NewFakeAsyncLoader()
Hannah Howard's avatar
Hannah Howard committed
876 877 878 879 880 881 882 883 884
	td.requestHooks = hooks.NewRequestHooks()
	td.responseHooks = hooks.NewResponseHooks()
	td.blockHooks = hooks.NewBlockHooks()
	td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks)
	td.requestManager.SetDelegate(td.fph)
	td.requestManager.Startup()
	td.blockStore = make(map[ipld.Link][]byte)
	td.loader, td.storer = testutil.NewTestStore(td.blockStore)
	td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
885 886 887 888 889 890 891 892 893 894 895 896
	td.extensionData1 = testutil.RandomBytes(100)
	td.extensionName1 = graphsync.ExtensionName("AppleSauce/McGee")
	td.extension1 = graphsync.ExtensionData{
		Name: td.extensionName1,
		Data: td.extensionData1,
	}
	td.extensionData2 = testutil.RandomBytes(100)
	td.extensionName2 = graphsync.ExtensionName("HappyLand/Happenstance")
	td.extension2 = graphsync.ExtensionData{
		Name: td.extensionName2,
		Data: td.extensionData2,
	}
Hannah Howard's avatar
Hannah Howard committed
897
	return td
898
}