requestmanager_test.go 35.3 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
7 8 9
	"testing"
	"time"

Hannah Howard's avatar
Hannah Howard committed
10 11 12
	blocks "github.com/ipfs/go-block-format"
	"github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
13
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
14
	"github.com/stretchr/testify/require"
15

Hannah Howard's avatar
Hannah Howard committed
16 17
	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/cidset"
Hannah Howard's avatar
Hannah Howard committed
18
	"github.com/ipfs/go-graphsync/dedupkey"
19
	gsmsg "github.com/ipfs/go-graphsync/message"
Hannah Howard's avatar
Hannah Howard committed
20
	"github.com/ipfs/go-graphsync/metadata"
21
	"github.com/ipfs/go-graphsync/notifications"
Hannah Howard's avatar
Hannah Howard committed
22 23 24
	"github.com/ipfs/go-graphsync/requestmanager/hooks"
	"github.com/ipfs/go-graphsync/requestmanager/testloader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
25 26 27 28
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
29 30
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
31
}
32

33 34 35 36
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

37
func (fph *fakePeerHandler) SendRequest(p peer.ID,
38
	graphSyncRequest gsmsg.GraphSyncRequest, notifees ...notifications.Notifee) {
39
	fph.requestRecordChan <- requestRecord{
40 41
		gsr: graphSyncRequest,
		p:   p,
42 43 44
	}
}

45 46 47 48 49 50
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
Hannah Howard's avatar
Hannah Howard committed
51 52 53
		var rr requestRecord
		testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
		requestRecords = append(requestRecords, rr)
54 55 56 57
	}
	return requestRecords
}

58 59 60 61
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
62
			Link:         block.Cid(),
63 64 65 66 67 68
			BlockPresent: present,
		})
	}
	return md
}

69
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
70
	md := metadataForBlocks(blks, present)
71
	metadataEncoded, err := metadata.EncodeMetadata(md)
Hannah Howard's avatar
Hannah Howard committed
72
	require.NoError(t, err, "did not encode metadata")
73 74
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
75 76
		Data: metadataEncoded,
	}
77 78
}

79 80
func TestNormalSimultaneousFetch(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
81
	td := newTestData(ctx, t)
82 83 84

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
85
	peers := testutil.GeneratePeers(1)
86

Hannah Howard's avatar
Hannah Howard committed
87
	blockChain2 := testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
88

Hannah Howard's avatar
Hannah Howard committed
89 90
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
91

Hannah Howard's avatar
Hannah Howard committed
92
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
93

Hannah Howard's avatar
Hannah Howard committed
94 95 96 97
	require.Equal(t, peers[0], requestRecords[0].p)
	require.Equal(t, peers[0], requestRecords[1].p)
	require.False(t, requestRecords[0].gsr.IsCancel())
	require.False(t, requestRecords[1].gsr.IsCancel())
98 99
	require.Equal(t, defaultPriority, requestRecords[0].gsr.Priority())
	require.Equal(t, defaultPriority, requestRecords[1].gsr.Priority())
100

Hannah Howard's avatar
Hannah Howard committed
101
	require.Equal(t, td.blockChain.Selector(), requestRecords[0].gsr.Selector(), "did not encode selector properly")
Hannah Howard's avatar
Hannah Howard committed
102
	require.Equal(t, blockChain2.Selector(), requestRecords[1].gsr.Selector(), "did not encode selector properly")
103

Hannah Howard's avatar
Hannah Howard committed
104 105
	firstBlocks := append(td.blockChain.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(td.blockChain.AllBlocks(), true)
106
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
Hannah Howard's avatar
Hannah Howard committed
107
	require.NoError(t, err, "did not encode metadata")
108
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
109
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
Hannah Howard's avatar
Hannah Howard committed
110
	require.NoError(t, err, "did not encode metadata")
111
	firstResponses := []gsmsg.GraphSyncResponse{
112 113
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
114 115
			Data: firstMetadataEncoded1,
		}),
116 117
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
118 119
			Data: firstMetadataEncoded2,
		}),
120 121
	}

Hannah Howard's avatar
Hannah Howard committed
122
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
123 124
	td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
125 126 127
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
128 129
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
130

Hannah Howard's avatar
Hannah Howard committed
131
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan1)
132
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
133

134
	moreBlocks := blockChain2.RemainderBlocks(3)
135
	moreMetadata := metadataForBlocks(moreBlocks, true)
136
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
Hannah Howard's avatar
Hannah Howard committed
137
	require.NoError(t, err, "did not encode metadata")
138
	moreResponses := []gsmsg.GraphSyncResponse{
139 140
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
141 142
			Data: moreMetadataEncoded,
		}),
143 144
	}

Hannah Howard's avatar
Hannah Howard committed
145
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
146 147
	td.fal.VerifyLastProcessedBlocks(ctx, t, moreBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
148 149
		requestRecords[1].gsr.ID(): moreMetadata,
	})
150

151
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
152

153
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
154 155
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
156 157 158 159
}

func TestCancelRequestInProgress(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
160
	td := newTestData(ctx, t)
161 162 163 164 165
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
166
	peers := testutil.GeneratePeers(1)
167

Hannah Howard's avatar
Hannah Howard committed
168 169
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
170

Hannah Howard's avatar
Hannah Howard committed
171
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
172

Hannah Howard's avatar
Hannah Howard committed
173
	firstBlocks := td.blockChain.Blocks(0, 3)
174
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
175
	firstResponses := []gsmsg.GraphSyncResponse{
176 177
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
178 179
	}

Hannah Howard's avatar
Hannah Howard committed
180
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
181

182 183
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
184
	td.blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
185
	cancel1()
Hannah Howard's avatar
Hannah Howard committed
186
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
Hannah Howard's avatar
Hannah Howard committed
187 188 189

	require.True(t, rr.gsr.IsCancel())
	require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
190

Hannah Howard's avatar
Hannah Howard committed
191
	moreBlocks := td.blockChain.RemainderBlocks(3)
192
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
193
	moreResponses := []gsmsg.GraphSyncResponse{
194 195
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
196
	}
Hannah Howard's avatar
Hannah Howard committed
197
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
198 199
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
200

201
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
Hannah Howard's avatar
Hannah Howard committed
202
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
203

204
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
205 206 207 208 209

	errors := testutil.CollectErrors(requestCtx, t, returnedErrorChan1)
	require.Len(t, errors, 1)
	_, ok := errors[0].(graphsync.RequestContextCancelledErr)
	require.True(t, ok)
210 211 212 213 214
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
Hannah Howard's avatar
Hannah Howard committed
215
	td := newTestData(managerCtx, t)
216 217
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
218
	peers := testutil.GeneratePeers(1)
219

Hannah Howard's avatar
Hannah Howard committed
220
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
221

Hannah Howard's avatar
Hannah Howard committed
222
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
223

Hannah Howard's avatar
Hannah Howard committed
224
	firstBlocks := td.blockChain.Blocks(0, 3)
225
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
226
	firstResponses := []gsmsg.GraphSyncResponse{
227
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
228
	}
Hannah Howard's avatar
Hannah Howard committed
229
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
230
	td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
231
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
232 233
	managerCancel()

Hannah Howard's avatar
Hannah Howard committed
234
	moreBlocks := td.blockChain.RemainderBlocks(3)
235
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
236
	moreResponses := []gsmsg.GraphSyncResponse{
237
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
238
	}
Hannah Howard's avatar
Hannah Howard committed
239
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
240
	td.fal.SuccessResponseOn(rr.gsr.ID(), moreBlocks)
241
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
242
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
243 244
}

245 246
func TestFailedRequest(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
247
	td := newTestData(ctx, t)
248 249
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
250
	peers := testutil.GeneratePeers(1)
251

Hannah Howard's avatar
Hannah Howard committed
252
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
253

Hannah Howard's avatar
Hannah Howard committed
254
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
255
	failedResponses := []gsmsg.GraphSyncResponse{
256
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
257
	}
Hannah Howard's avatar
Hannah Howard committed
258
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
259

260 261
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
262 263 264 265
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
266
	td := newTestData(ctx, t)
267 268 269 270 271

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
272
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
273

Hannah Howard's avatar
Hannah Howard committed
274
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
275 276

	// async loaded response responds immediately
277
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
278

Hannah Howard's avatar
Hannah Howard committed
279
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
280 281 282

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
283
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
284 285
	}

Hannah Howard's avatar
Hannah Howard committed
286
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
287
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
288 289 290 291 292

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
293
	td := newTestData(ctx, t)
294 295 296 297 298

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

299 300 301 302
	called := make(chan struct{})
	td.responseHooks.Register(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
		close(called)
	})
Hannah Howard's avatar
Hannah Howard committed
303
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
304

Hannah Howard's avatar
Hannah Howard committed
305
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
306 307

	// async loaded response responds immediately
308
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
309

Hannah Howard's avatar
Hannah Howard committed
310
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
311

Hannah Howard's avatar
Hannah Howard committed
312
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), true)
313
	firstResponses := []gsmsg.GraphSyncResponse{
314
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
315
	}
Hannah Howard's avatar
Hannah Howard committed
316
	td.requestManager.ProcessResponses(peers[0], firstResponses, td.blockChain.AllBlocks())
317

318
	td.fal.VerifyNoRemainingData(t, rr.gsr.ID())
319
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
320
	testutil.AssertDoesReceive(requestCtx, t, called, "response hooks called for response")
321 322 323 324
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
325
	td := newTestData(ctx, t)
326 327 328 329 330

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
331
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
332

Hannah Howard's avatar
Hannah Howard committed
333
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
334

Hannah Howard's avatar
Hannah Howard committed
335
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), false)
336
	firstResponses := []gsmsg.GraphSyncResponse{
337
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
338
	}
Hannah Howard's avatar
Hannah Howard committed
339 340
	td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
	for _, block := range td.blockChain.AllBlocks() {
341
		td.fal.ResponseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
342
	}
343 344
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
345
	require.NotEqual(t, len(errs), 0, "did not send errors")
346
}
347 348 349

func TestEncodingExtensions(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
350
	td := newTestData(ctx, t)
351 352 353 354 355

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

356 357
	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
Hannah Howard's avatar
Hannah Howard committed
358 359
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
360
		data, has := responseData.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
361
		require.True(t, has, "did not receive extension data in response")
362
		receivedExtensionData <- data
Hannah Howard's avatar
Hannah Howard committed
363 364 365 366 367 368 369 370
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
371
	}
Hannah Howard's avatar
Hannah Howard committed
372
	td.responseHooks.Register(hook)
373
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
374

Hannah Howard's avatar
Hannah Howard committed
375
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
376 377

	gsr := rr.gsr
378
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
379
	require.True(t, found)
380
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
381

382
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
383
	require.True(t, found)
384
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
385

386 387
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
388
		expectedUpdate := testutil.RandomBytes(100)
389 390 391 392 393 394 395
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
396
					Name: td.extensionName1,
397 398 399 400 401
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
Hannah Howard's avatar
Hannah Howard committed
402 403
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
404
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
405 406 407
				Data: expectedUpdate,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
408
		td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
409 410 411
		var received []byte
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, expectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
412

Hannah Howard's avatar
Hannah Howard committed
413
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
414
		receivedUpdateData, has := rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
415 416 417
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

418
		nextExpectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
419 420
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
421 422 423 424 425 426 427 428

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
429
					Name: td.extensionName1,
430 431 432 433 434
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
Hannah Howard's avatar
Hannah Howard committed
435 436
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
437
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
438 439 440
				Data: nextExpectedUpdate1,
			},
			{
441
				Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
442 443 444
				Data: nextExpectedUpdate2,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
445
		td.requestManager.ProcessResponses(peers[0], secondResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
446 447
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
448

Hannah Howard's avatar
Hannah Howard committed
449
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
450
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
451 452
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
453
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
454 455 456
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

457 458 459
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
460
}
461

Hannah Howard's avatar
Hannah Howard committed
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
func TestBlockHooks(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	receivedBlocks := make(chan graphsync.BlockData, 4)
	receivedResponses := make(chan graphsync.ResponseData, 4)
	expectedError := make(chan error, 4)
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 4)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		receivedBlocks <- blockData
		receivedResponses <- responseData
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
	}
	td.blockHooks.Register(hook)
487
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
Hannah Howard's avatar
Hannah Howard committed
488 489 490 491

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	gsr := rr.gsr
492
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
493
	require.True(t, found)
494
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
Hannah Howard's avatar
Hannah Howard committed
495

496
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
497
	require.True(t, found)
498
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
Hannah Howard's avatar
Hannah Howard committed
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514

	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		expectedUpdate := testutil.RandomBytes(100)

		firstBlocks := td.blockChain.Blocks(0, 3)
		firstMetadata := metadataForBlocks(firstBlocks, true)
		firstMetadataEncoded, err := metadata.EncodeMetadata(firstMetadata)
		require.NoError(t, err, "did not encode metadata")
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: firstMetadataEncoded,
				},
				graphsync.ExtensionData{
515
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
516 517 518 519 520 521 522 523 524 525
					Data: expectedData,
				},
			),
		}
		for i := range firstBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(firstBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
526
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
527 528 529 530 531 532 533 534
						Data: expectedUpdate,
					},
				}
			}
			expectedUpdateChan <- update
		}

		td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
535 536
		td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
537 538
			rr.gsr.ID(): firstMetadata,
		})
539
		td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
540 541

		ur := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
542
		receivedUpdateData, has := ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
543 544 545 546 547 548 549 550 551 552 553
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range firstBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, firstResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, firstResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, firstMetadataEncoded, metadata, "should receive correct metadata")
554
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
			require.Equal(t, expectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		nextExpectedData := testutil.RandomBytes(100)
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
		nextBlocks := td.blockChain.RemainderBlocks(3)
		nextMetadata := metadataForBlocks(nextBlocks, true)
		nextMetadataEncoded, err := metadata.EncodeMetadata(nextMetadata)
		require.NoError(t, err)
		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.RequestCompletedFull, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nextMetadataEncoded,
				},
				graphsync.ExtensionData{
576
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
577 578 579 580 581 582 583 584 585 586
					Data: nextExpectedData,
				},
			),
		}
		for i := range nextBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(nextBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
587
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
588 589 590
						Data: nextExpectedUpdate1,
					},
					{
591
						Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
592 593 594 595 596 597 598
						Data: nextExpectedUpdate2,
					},
				}
			}
			expectedUpdateChan <- update
		}
		td.requestManager.ProcessResponses(peers[0], secondResponses, nextBlocks)
599 600
		td.fal.VerifyLastProcessedBlocks(ctx, t, nextBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
601 602
			rr.gsr.ID(): nextMetadata,
		})
603
		td.fal.SuccessResponseOn(rr.gsr.ID(), nextBlocks)
Hannah Howard's avatar
Hannah Howard committed
604 605

		ur = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
606
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
607 608
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
609
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
610 611 612 613 614 615 616 617 618 619 620
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range nextBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, secondResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, secondResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, nextMetadataEncoded, metadata, "should receive correct metadata")
621
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
622 623 624 625 626 627 628 629 630 631 632 633
			require.Equal(t, nextExpectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
		td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
	})
}

634 635
func TestOutgoingRequestHooks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
636
	td := newTestData(ctx, t)
637 638 639 640 641 642

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	hook := func(p peer.ID, r graphsync.RequestData, ha graphsync.OutgoingRequestHookActions) {
643
		_, has := r.Extension(td.extensionName1)
644
		if has {
Eric Myhre's avatar
Eric Myhre committed
645
			ha.UseLinkTargetNodePrototypeChooser(td.blockChain.Chooser)
646 647 648
			ha.UsePersistenceOption("chainstore")
		}
	}
Hannah Howard's avatar
Hannah Howard committed
649
	td.requestHooks.Register(hook)
650

651
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1)
Hannah Howard's avatar
Hannah Howard committed
652
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
653

Hannah Howard's avatar
Hannah Howard committed
654
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
655

Hannah Howard's avatar
Hannah Howard committed
656 657 658 659 660 661
	dedupData, has := requestRecords[0].gsr.Extension(graphsync.ExtensionDeDupByKey)
	require.True(t, has)
	key, err := dedupkey.DecodeDedupKey(dedupData)
	require.NoError(t, err)
	require.Equal(t, "chainstore", key)

Hannah Howard's avatar
Hannah Howard committed
662
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
663 664 665 666 667 668 669 670 671 672
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	mdExt := graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
		Data: mdEncoded,
	}
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
	}
Hannah Howard's avatar
Hannah Howard committed
673
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
674 675
	td.fal.VerifyLastProcessedBlocks(ctx, t, td.blockChain.AllBlocks())
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
676 677 678
		requestRecords[0].gsr.ID(): md,
		requestRecords[1].gsr.ID(): md,
	})
679 680
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), td.blockChain.AllBlocks())
681

Hannah Howard's avatar
Hannah Howard committed
682 683
	td.blockChain.VerifyWholeChainWithTypes(requestCtx, returnedResponseChan1)
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
684 685
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan2)

	td.fal.VerifyStoreUsed(t, requestRecords[0].gsr.ID(), "chainstore")
	td.fal.VerifyStoreUsed(t, requestRecords[1].gsr.ID(), "")
}

func TestPauseResume(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForResumeAttempt := make(chan struct{})
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt-1 {
			<-holdForResumeAttempt
		}
		if blocksReceived == pauseAt {
			hookActions.PauseRequest()
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// attempt to unpause while request is not paused (note: hook on second block will keep it from
	// reaching pause point)
	err = td.requestManager.UnpauseRequest(rr.gsr.ID())
	require.EqualError(t, err, "request is not paused")
	close(holdForResumeAttempt)
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
}
func TestPauseResumeExternal(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt {
			err := td.requestManager.PauseRequest(responseData.RequestID())
			require.NoError(t, err)
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
858 859 860 861 862
}

type testData struct {
	requestRecordChan chan requestRecord
	fph               *fakePeerHandler
863
	fal               *testloader.FakeAsyncLoader
Hannah Howard's avatar
Hannah Howard committed
864 865 866 867 868 869 870 871
	requestHooks      *hooks.OutgoingRequestHooks
	responseHooks     *hooks.IncomingResponseHooks
	blockHooks        *hooks.IncomingBlockHooks
	requestManager    *RequestManager
	blockStore        map[ipld.Link][]byte
	loader            ipld.Loader
	storer            ipld.Storer
	blockChain        *testutil.TestBlockChain
872 873 874 875 876 877
	extensionName1    graphsync.ExtensionName
	extensionData1    []byte
	extension1        graphsync.ExtensionData
	extensionName2    graphsync.ExtensionName
	extensionData2    []byte
	extension2        graphsync.ExtensionData
Hannah Howard's avatar
Hannah Howard committed
878 879 880 881 882 883
}

func newTestData(ctx context.Context, t *testing.T) *testData {
	td := &testData{}
	td.requestRecordChan = make(chan requestRecord, 3)
	td.fph = &fakePeerHandler{td.requestRecordChan}
884
	td.fal = testloader.NewFakeAsyncLoader()
Hannah Howard's avatar
Hannah Howard committed
885 886 887 888 889 890 891 892 893
	td.requestHooks = hooks.NewRequestHooks()
	td.responseHooks = hooks.NewResponseHooks()
	td.blockHooks = hooks.NewBlockHooks()
	td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks)
	td.requestManager.SetDelegate(td.fph)
	td.requestManager.Startup()
	td.blockStore = make(map[ipld.Link][]byte)
	td.loader, td.storer = testutil.NewTestStore(td.blockStore)
	td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
894 895 896 897 898 899 900 901 902 903 904 905
	td.extensionData1 = testutil.RandomBytes(100)
	td.extensionName1 = graphsync.ExtensionName("AppleSauce/McGee")
	td.extension1 = graphsync.ExtensionData{
		Name: td.extensionName1,
		Data: td.extensionData1,
	}
	td.extensionData2 = testutil.RandomBytes(100)
	td.extensionName2 = graphsync.ExtensionName("HappyLand/Happenstance")
	td.extension2 = graphsync.ExtensionData{
		Name: td.extensionName2,
		Data: td.extensionData2,
	}
Hannah Howard's avatar
Hannah Howard committed
906
	return td
907
}