requestmanager_test.go 34.6 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
7 8 9
	"testing"
	"time"

10 11 12
	"github.com/ipfs/go-graphsync/cidset"
	"github.com/ipfs/go-graphsync/requestmanager/testloader"

13
	"github.com/ipfs/go-graphsync"
Hannah Howard's avatar
Hannah Howard committed
14
	"github.com/ipfs/go-graphsync/requestmanager/hooks"
15
	"github.com/ipfs/go-graphsync/requestmanager/types"
16
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
17
	"github.com/stretchr/testify/require"
18 19 20

	"github.com/ipfs/go-graphsync/metadata"

21
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
22 23 24

	"github.com/ipld/go-ipld-prime"

25 26 27 28 29 30
	blocks "github.com/ipfs/go-block-format"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
31 32
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
33
}
34

35 36 37 38
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

39 40
func (fph *fakePeerHandler) SendRequest(p peer.ID,
	graphSyncRequest gsmsg.GraphSyncRequest) {
41
	fph.requestRecordChan <- requestRecord{
42 43
		gsr: graphSyncRequest,
		p:   p,
44 45 46
	}
}

47 48 49 50 51 52
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
Hannah Howard's avatar
Hannah Howard committed
53 54 55
		var rr requestRecord
		testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
		requestRecords = append(requestRecords, rr)
56 57 58 59
	}
	return requestRecords
}

60 61 62 63 64 65 66 67 68 69 70
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
			Link:         cidlink.Link{Cid: block.Cid()},
			BlockPresent: present,
		})
	}
	return md
}

71
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
72
	md := metadataForBlocks(blks, present)
73
	metadataEncoded, err := metadata.EncodeMetadata(md)
Hannah Howard's avatar
Hannah Howard committed
74
	require.NoError(t, err, "did not encode metadata")
75 76
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
77 78
		Data: metadataEncoded,
	}
79 80
}

81 82
func TestNormalSimultaneousFetch(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
83
	td := newTestData(ctx, t)
84 85 86

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
87
	peers := testutil.GeneratePeers(1)
88

Hannah Howard's avatar
Hannah Howard committed
89
	blockChain2 := testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
90

Hannah Howard's avatar
Hannah Howard committed
91 92
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
93

Hannah Howard's avatar
Hannah Howard committed
94
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
95

Hannah Howard's avatar
Hannah Howard committed
96 97 98 99
	require.Equal(t, peers[0], requestRecords[0].p)
	require.Equal(t, peers[0], requestRecords[1].p)
	require.False(t, requestRecords[0].gsr.IsCancel())
	require.False(t, requestRecords[1].gsr.IsCancel())
100 101
	require.Equal(t, defaultPriority, requestRecords[0].gsr.Priority())
	require.Equal(t, defaultPriority, requestRecords[1].gsr.Priority())
102

Hannah Howard's avatar
Hannah Howard committed
103
	require.Equal(t, td.blockChain.Selector(), requestRecords[0].gsr.Selector(), "did not encode selector properly")
Hannah Howard's avatar
Hannah Howard committed
104
	require.Equal(t, blockChain2.Selector(), requestRecords[1].gsr.Selector(), "did not encode selector properly")
105

Hannah Howard's avatar
Hannah Howard committed
106 107
	firstBlocks := append(td.blockChain.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(td.blockChain.AllBlocks(), true)
108
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
Hannah Howard's avatar
Hannah Howard committed
109
	require.NoError(t, err, "did not encode metadata")
110
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
111
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
Hannah Howard's avatar
Hannah Howard committed
112
	require.NoError(t, err, "did not encode metadata")
113
	firstResponses := []gsmsg.GraphSyncResponse{
114 115
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
116 117
			Data: firstMetadataEncoded1,
		}),
118 119
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
120 121
			Data: firstMetadataEncoded2,
		}),
122 123
	}

Hannah Howard's avatar
Hannah Howard committed
124
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
125 126
	td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
127 128 129
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
130 131
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
132

Hannah Howard's avatar
Hannah Howard committed
133
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan1)
134
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
135

136
	moreBlocks := blockChain2.RemainderBlocks(3)
137
	moreMetadata := metadataForBlocks(moreBlocks, true)
138
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
Hannah Howard's avatar
Hannah Howard committed
139
	require.NoError(t, err, "did not encode metadata")
140
	moreResponses := []gsmsg.GraphSyncResponse{
141 142
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
143 144
			Data: moreMetadataEncoded,
		}),
145 146
	}

Hannah Howard's avatar
Hannah Howard committed
147
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
148 149
	td.fal.VerifyLastProcessedBlocks(ctx, t, moreBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
150 151
		requestRecords[1].gsr.ID(): moreMetadata,
	})
152

153
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
154

155
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
156 157
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
158 159 160 161
}

func TestCancelRequestInProgress(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
162
	td := newTestData(ctx, t)
163 164 165 166 167
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
168
	peers := testutil.GeneratePeers(1)
169

Hannah Howard's avatar
Hannah Howard committed
170 171
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
172

Hannah Howard's avatar
Hannah Howard committed
173
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
174

Hannah Howard's avatar
Hannah Howard committed
175
	firstBlocks := td.blockChain.Blocks(0, 3)
176
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
177
	firstResponses := []gsmsg.GraphSyncResponse{
178 179
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
180 181
	}

Hannah Howard's avatar
Hannah Howard committed
182
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
183

184 185
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
186
	td.blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
187
	cancel1()
Hannah Howard's avatar
Hannah Howard committed
188
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
Hannah Howard's avatar
Hannah Howard committed
189 190 191

	require.True(t, rr.gsr.IsCancel())
	require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
192

Hannah Howard's avatar
Hannah Howard committed
193
	moreBlocks := td.blockChain.RemainderBlocks(3)
194
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
195
	moreResponses := []gsmsg.GraphSyncResponse{
196 197
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
198
	}
Hannah Howard's avatar
Hannah Howard committed
199
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
200 201
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
202

203
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
Hannah Howard's avatar
Hannah Howard committed
204
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
205 206
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
207 208 209 210 211
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
Hannah Howard's avatar
Hannah Howard committed
212
	td := newTestData(managerCtx, t)
213 214
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
215
	peers := testutil.GeneratePeers(1)
216

Hannah Howard's avatar
Hannah Howard committed
217
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
218

Hannah Howard's avatar
Hannah Howard committed
219
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
220

Hannah Howard's avatar
Hannah Howard committed
221
	firstBlocks := td.blockChain.Blocks(0, 3)
222
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
223
	firstResponses := []gsmsg.GraphSyncResponse{
224
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
225
	}
Hannah Howard's avatar
Hannah Howard committed
226
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
227
	td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
228
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
229 230
	managerCancel()

Hannah Howard's avatar
Hannah Howard committed
231
	moreBlocks := td.blockChain.RemainderBlocks(3)
232
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
233
	moreResponses := []gsmsg.GraphSyncResponse{
234
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
235
	}
Hannah Howard's avatar
Hannah Howard committed
236
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
237
	td.fal.SuccessResponseOn(rr.gsr.ID(), moreBlocks)
238
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
239
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
240 241
}

242 243
func TestFailedRequest(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
244
	td := newTestData(ctx, t)
245 246
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
247
	peers := testutil.GeneratePeers(1)
248

Hannah Howard's avatar
Hannah Howard committed
249
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
250

Hannah Howard's avatar
Hannah Howard committed
251
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
252
	failedResponses := []gsmsg.GraphSyncResponse{
253
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
254
	}
Hannah Howard's avatar
Hannah Howard committed
255
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
256

257 258
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
259 260 261 262
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
263
	td := newTestData(ctx, t)
264 265 266 267 268

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
269
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
270

Hannah Howard's avatar
Hannah Howard committed
271
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
272 273

	// async loaded response responds immediately
274
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
275

Hannah Howard's avatar
Hannah Howard committed
276
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
277 278 279

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
280
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
281 282
	}

Hannah Howard's avatar
Hannah Howard committed
283
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
284
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
285 286 287 288 289

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
290
	td := newTestData(ctx, t)
291 292 293 294 295

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
296
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
297

Hannah Howard's avatar
Hannah Howard committed
298
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
299 300

	// async loaded response responds immediately
301
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
302

Hannah Howard's avatar
Hannah Howard committed
303
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
304

Hannah Howard's avatar
Hannah Howard committed
305
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), true)
306
	firstResponses := []gsmsg.GraphSyncResponse{
307
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
308
	}
Hannah Howard's avatar
Hannah Howard committed
309
	td.requestManager.ProcessResponses(peers[0], firstResponses, td.blockChain.AllBlocks())
310

311
	td.fal.VerifyNoRemainingData(t, rr.gsr.ID())
312
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
313 314 315 316
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
317
	td := newTestData(ctx, t)
318 319 320 321 322

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
323
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
324

Hannah Howard's avatar
Hannah Howard committed
325
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
326

Hannah Howard's avatar
Hannah Howard committed
327
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), false)
328
	firstResponses := []gsmsg.GraphSyncResponse{
329
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
330
	}
Hannah Howard's avatar
Hannah Howard committed
331 332
	td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
	for _, block := range td.blockChain.AllBlocks() {
333
		td.fal.ResponseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
334
	}
335 336
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
337
	require.NotEqual(t, len(errs), 0, "did not send errors")
338
}
339 340 341

func TestEncodingExtensions(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
342
	td := newTestData(ctx, t)
343 344 345 346 347

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

348 349
	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
Hannah Howard's avatar
Hannah Howard committed
350 351
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
352
		data, has := responseData.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
353
		require.True(t, has, "did not receive extension data in response")
354
		receivedExtensionData <- data
Hannah Howard's avatar
Hannah Howard committed
355 356 357 358 359 360 361 362
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
363
	}
Hannah Howard's avatar
Hannah Howard committed
364
	td.responseHooks.Register(hook)
365
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
366

Hannah Howard's avatar
Hannah Howard committed
367
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
368 369

	gsr := rr.gsr
370
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
371
	require.True(t, found)
372
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
373

374
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
375
	require.True(t, found)
376
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
377

378 379
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
380
		expectedUpdate := testutil.RandomBytes(100)
381 382 383 384 385 386 387
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
388
					Name: td.extensionName1,
389 390 391 392 393
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
Hannah Howard's avatar
Hannah Howard committed
394 395
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
396
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
397 398 399
				Data: expectedUpdate,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
400
		td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
401 402 403
		var received []byte
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, expectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
404

Hannah Howard's avatar
Hannah Howard committed
405
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
406
		receivedUpdateData, has := rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
407 408 409
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

410
		nextExpectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
411 412
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
413 414 415 416 417 418 419 420

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
421
					Name: td.extensionName1,
422 423 424 425 426
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
Hannah Howard's avatar
Hannah Howard committed
427 428
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
429
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
430 431 432
				Data: nextExpectedUpdate1,
			},
			{
433
				Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
434 435 436
				Data: nextExpectedUpdate2,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
437
		td.requestManager.ProcessResponses(peers[0], secondResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
438 439
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
440

Hannah Howard's avatar
Hannah Howard committed
441
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
442
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
443 444
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
445
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
446 447 448
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

449 450 451
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
452
}
453

Hannah Howard's avatar
Hannah Howard committed
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
func TestBlockHooks(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	receivedBlocks := make(chan graphsync.BlockData, 4)
	receivedResponses := make(chan graphsync.ResponseData, 4)
	expectedError := make(chan error, 4)
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 4)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		receivedBlocks <- blockData
		receivedResponses <- responseData
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
	}
	td.blockHooks.Register(hook)
479
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
Hannah Howard's avatar
Hannah Howard committed
480 481 482 483

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	gsr := rr.gsr
484
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
485
	require.True(t, found)
486
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
Hannah Howard's avatar
Hannah Howard committed
487

488
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
489
	require.True(t, found)
490
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
Hannah Howard's avatar
Hannah Howard committed
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506

	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		expectedUpdate := testutil.RandomBytes(100)

		firstBlocks := td.blockChain.Blocks(0, 3)
		firstMetadata := metadataForBlocks(firstBlocks, true)
		firstMetadataEncoded, err := metadata.EncodeMetadata(firstMetadata)
		require.NoError(t, err, "did not encode metadata")
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: firstMetadataEncoded,
				},
				graphsync.ExtensionData{
507
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
508 509 510 511 512 513 514 515 516 517
					Data: expectedData,
				},
			),
		}
		for i := range firstBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(firstBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
518
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
519 520 521 522 523 524 525 526
						Data: expectedUpdate,
					},
				}
			}
			expectedUpdateChan <- update
		}

		td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
527 528
		td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
529 530
			rr.gsr.ID(): firstMetadata,
		})
531
		td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
532 533

		ur := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
534
		receivedUpdateData, has := ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
535 536 537 538 539 540 541 542 543 544 545
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range firstBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, firstResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, firstResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, firstMetadataEncoded, metadata, "should receive correct metadata")
546
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
			require.Equal(t, expectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		nextExpectedData := testutil.RandomBytes(100)
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
		nextBlocks := td.blockChain.RemainderBlocks(3)
		nextMetadata := metadataForBlocks(nextBlocks, true)
		nextMetadataEncoded, err := metadata.EncodeMetadata(nextMetadata)
		require.NoError(t, err)
		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.RequestCompletedFull, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nextMetadataEncoded,
				},
				graphsync.ExtensionData{
568
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
569 570 571 572 573 574 575 576 577 578
					Data: nextExpectedData,
				},
			),
		}
		for i := range nextBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(nextBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
579
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
580 581 582
						Data: nextExpectedUpdate1,
					},
					{
583
						Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
584 585 586 587 588 589 590
						Data: nextExpectedUpdate2,
					},
				}
			}
			expectedUpdateChan <- update
		}
		td.requestManager.ProcessResponses(peers[0], secondResponses, nextBlocks)
591 592
		td.fal.VerifyLastProcessedBlocks(ctx, t, nextBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
593 594
			rr.gsr.ID(): nextMetadata,
		})
595
		td.fal.SuccessResponseOn(rr.gsr.ID(), nextBlocks)
Hannah Howard's avatar
Hannah Howard committed
596 597

		ur = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
598
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
599 600
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
601
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
602 603 604 605 606 607 608 609 610 611 612
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range nextBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, secondResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, secondResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, nextMetadataEncoded, metadata, "should receive correct metadata")
613
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
614 615 616 617 618 619 620 621 622 623 624 625
			require.Equal(t, nextExpectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
		td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
	})
}

626 627
func TestOutgoingRequestHooks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
628
	td := newTestData(ctx, t)
629 630 631 632 633 634

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	hook := func(p peer.ID, r graphsync.RequestData, ha graphsync.OutgoingRequestHookActions) {
635
		_, has := r.Extension(td.extensionName1)
636
		if has {
Hannah Howard's avatar
Hannah Howard committed
637
			ha.UseLinkTargetNodeStyleChooser(td.blockChain.Chooser)
638 639 640
			ha.UsePersistenceOption("chainstore")
		}
	}
Hannah Howard's avatar
Hannah Howard committed
641
	td.requestHooks.Register(hook)
642

643
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1)
Hannah Howard's avatar
Hannah Howard committed
644
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
645

Hannah Howard's avatar
Hannah Howard committed
646
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
647

Hannah Howard's avatar
Hannah Howard committed
648
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
649 650 651 652 653 654 655 656 657 658
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	mdExt := graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
		Data: mdEncoded,
	}
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
	}
Hannah Howard's avatar
Hannah Howard committed
659
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
660 661
	td.fal.VerifyLastProcessedBlocks(ctx, t, td.blockChain.AllBlocks())
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
662 663 664
		requestRecords[0].gsr.ID(): md,
		requestRecords[1].gsr.ID(): md,
	})
665 666
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), td.blockChain.AllBlocks())
667

Hannah Howard's avatar
Hannah Howard committed
668 669
	td.blockChain.VerifyWholeChainWithTypes(requestCtx, returnedResponseChan1)
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
670 671
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan2)
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
	td.fal.VerifyStoreUsed(t, requestRecords[0].gsr.ID(), "chainstore")
	td.fal.VerifyStoreUsed(t, requestRecords[1].gsr.ID(), "")
}

func TestPauseResume(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForResumeAttempt := make(chan struct{})
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt-1 {
			<-holdForResumeAttempt
		}
		if blocksReceived == pauseAt {
			hookActions.PauseRequest()
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// attempt to unpause while request is not paused (note: hook on second block will keep it from
	// reaching pause point)
	err = td.requestManager.UnpauseRequest(rr.gsr.ID())
	require.EqualError(t, err, "request is not paused")
	close(holdForResumeAttempt)
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
}
func TestPauseResumeExternal(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt {
			err := td.requestManager.PauseRequest(responseData.RequestID())
			require.NoError(t, err)
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
844 845 846 847 848
}

type testData struct {
	requestRecordChan chan requestRecord
	fph               *fakePeerHandler
849
	fal               *testloader.FakeAsyncLoader
Hannah Howard's avatar
Hannah Howard committed
850 851 852 853 854 855 856 857
	requestHooks      *hooks.OutgoingRequestHooks
	responseHooks     *hooks.IncomingResponseHooks
	blockHooks        *hooks.IncomingBlockHooks
	requestManager    *RequestManager
	blockStore        map[ipld.Link][]byte
	loader            ipld.Loader
	storer            ipld.Storer
	blockChain        *testutil.TestBlockChain
858 859 860 861 862 863
	extensionName1    graphsync.ExtensionName
	extensionData1    []byte
	extension1        graphsync.ExtensionData
	extensionName2    graphsync.ExtensionName
	extensionData2    []byte
	extension2        graphsync.ExtensionData
Hannah Howard's avatar
Hannah Howard committed
864 865 866 867 868 869
}

func newTestData(ctx context.Context, t *testing.T) *testData {
	td := &testData{}
	td.requestRecordChan = make(chan requestRecord, 3)
	td.fph = &fakePeerHandler{td.requestRecordChan}
870
	td.fal = testloader.NewFakeAsyncLoader()
Hannah Howard's avatar
Hannah Howard committed
871 872 873 874 875 876 877 878 879
	td.requestHooks = hooks.NewRequestHooks()
	td.responseHooks = hooks.NewResponseHooks()
	td.blockHooks = hooks.NewBlockHooks()
	td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks)
	td.requestManager.SetDelegate(td.fph)
	td.requestManager.Startup()
	td.blockStore = make(map[ipld.Link][]byte)
	td.loader, td.storer = testutil.NewTestStore(td.blockStore)
	td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
880 881 882 883 884 885 886 887 888 889 890 891
	td.extensionData1 = testutil.RandomBytes(100)
	td.extensionName1 = graphsync.ExtensionName("AppleSauce/McGee")
	td.extension1 = graphsync.ExtensionData{
		Name: td.extensionName1,
		Data: td.extensionData1,
	}
	td.extensionData2 = testutil.RandomBytes(100)
	td.extensionName2 = graphsync.ExtensionName("HappyLand/Happenstance")
	td.extension2 = graphsync.ExtensionData{
		Name: td.extensionName2,
		Data: td.extensionData2,
	}
Hannah Howard's avatar
Hannah Howard committed
892
	return td
893
}