requestmanager_test.go 35.6 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
7 8 9
	"testing"
	"time"

Hannah Howard's avatar
Hannah Howard committed
10 11 12
	blocks "github.com/ipfs/go-block-format"
	"github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
13
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
14
	"github.com/stretchr/testify/require"
15

Hannah Howard's avatar
Hannah Howard committed
16 17
	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/cidset"
Hannah Howard's avatar
Hannah Howard committed
18
	"github.com/ipfs/go-graphsync/dedupkey"
19
	"github.com/ipfs/go-graphsync/listeners"
20
	gsmsg "github.com/ipfs/go-graphsync/message"
Hannah Howard's avatar
Hannah Howard committed
21
	"github.com/ipfs/go-graphsync/metadata"
22
	"github.com/ipfs/go-graphsync/notifications"
Hannah Howard's avatar
Hannah Howard committed
23 24 25
	"github.com/ipfs/go-graphsync/requestmanager/hooks"
	"github.com/ipfs/go-graphsync/requestmanager/testloader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
26 27 28 29
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
30 31
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
32
}
33

34 35 36 37
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

38
func (fph *fakePeerHandler) SendRequest(p peer.ID,
39
	graphSyncRequest gsmsg.GraphSyncRequest, notifees ...notifications.Notifee) {
40
	fph.requestRecordChan <- requestRecord{
41 42
		gsr: graphSyncRequest,
		p:   p,
43 44 45
	}
}

46 47 48 49 50 51
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
Hannah Howard's avatar
Hannah Howard committed
52 53 54
		var rr requestRecord
		testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
		requestRecords = append(requestRecords, rr)
55 56 57 58
	}
	return requestRecords
}

59 60 61 62
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
63
			Link:         block.Cid(),
64 65 66 67 68 69
			BlockPresent: present,
		})
	}
	return md
}

70
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
71
	md := metadataForBlocks(blks, present)
72
	metadataEncoded, err := metadata.EncodeMetadata(md)
Hannah Howard's avatar
Hannah Howard committed
73
	require.NoError(t, err, "did not encode metadata")
74 75
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
76 77
		Data: metadataEncoded,
	}
78 79
}

80 81
func TestNormalSimultaneousFetch(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
82
	td := newTestData(ctx, t)
83 84 85

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
86
	peers := testutil.GeneratePeers(1)
87

Hannah Howard's avatar
Hannah Howard committed
88
	blockChain2 := testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
89

Hannah Howard's avatar
Hannah Howard committed
90 91
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
92

Hannah Howard's avatar
Hannah Howard committed
93
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
94

Hannah Howard's avatar
Hannah Howard committed
95 96 97 98
	require.Equal(t, peers[0], requestRecords[0].p)
	require.Equal(t, peers[0], requestRecords[1].p)
	require.False(t, requestRecords[0].gsr.IsCancel())
	require.False(t, requestRecords[1].gsr.IsCancel())
99 100
	require.Equal(t, defaultPriority, requestRecords[0].gsr.Priority())
	require.Equal(t, defaultPriority, requestRecords[1].gsr.Priority())
101

Hannah Howard's avatar
Hannah Howard committed
102
	require.Equal(t, td.blockChain.Selector(), requestRecords[0].gsr.Selector(), "did not encode selector properly")
Hannah Howard's avatar
Hannah Howard committed
103
	require.Equal(t, blockChain2.Selector(), requestRecords[1].gsr.Selector(), "did not encode selector properly")
104

Hannah Howard's avatar
Hannah Howard committed
105 106
	firstBlocks := append(td.blockChain.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(td.blockChain.AllBlocks(), true)
107
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
Hannah Howard's avatar
Hannah Howard committed
108
	require.NoError(t, err, "did not encode metadata")
109
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
110
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
Hannah Howard's avatar
Hannah Howard committed
111
	require.NoError(t, err, "did not encode metadata")
112
	firstResponses := []gsmsg.GraphSyncResponse{
113 114
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
115 116
			Data: firstMetadataEncoded1,
		}),
117 118
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
119 120
			Data: firstMetadataEncoded2,
		}),
121 122
	}

Hannah Howard's avatar
Hannah Howard committed
123
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
124 125
	td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
126 127 128
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
129 130
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
131

Hannah Howard's avatar
Hannah Howard committed
132
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan1)
133
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
134

135
	moreBlocks := blockChain2.RemainderBlocks(3)
136
	moreMetadata := metadataForBlocks(moreBlocks, true)
137
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
Hannah Howard's avatar
Hannah Howard committed
138
	require.NoError(t, err, "did not encode metadata")
139
	moreResponses := []gsmsg.GraphSyncResponse{
140 141
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
142 143
			Data: moreMetadataEncoded,
		}),
144 145
	}

Hannah Howard's avatar
Hannah Howard committed
146
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
147 148
	td.fal.VerifyLastProcessedBlocks(ctx, t, moreBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
149 150
		requestRecords[1].gsr.ID(): moreMetadata,
	})
151

152
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
153

154
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
155 156
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
157 158 159 160
}

func TestCancelRequestInProgress(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
161
	td := newTestData(ctx, t)
162 163 164 165 166
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
167
	peers := testutil.GeneratePeers(1)
168

Hannah Howard's avatar
Hannah Howard committed
169 170
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
171

Hannah Howard's avatar
Hannah Howard committed
172
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
173

Hannah Howard's avatar
Hannah Howard committed
174
	firstBlocks := td.blockChain.Blocks(0, 3)
175
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
176
	firstResponses := []gsmsg.GraphSyncResponse{
177 178
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
179 180
	}

Hannah Howard's avatar
Hannah Howard committed
181
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
182

183 184
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
185
	td.blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
186
	cancel1()
Hannah Howard's avatar
Hannah Howard committed
187
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
Hannah Howard's avatar
Hannah Howard committed
188 189 190

	require.True(t, rr.gsr.IsCancel())
	require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
191

Hannah Howard's avatar
Hannah Howard committed
192
	moreBlocks := td.blockChain.RemainderBlocks(3)
193
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
194
	moreResponses := []gsmsg.GraphSyncResponse{
195 196
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
197
	}
Hannah Howard's avatar
Hannah Howard committed
198
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
199 200
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
201

202
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
Hannah Howard's avatar
Hannah Howard committed
203
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
204

205
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
206 207 208 209 210

	errors := testutil.CollectErrors(requestCtx, t, returnedErrorChan1)
	require.Len(t, errors, 1)
	_, ok := errors[0].(graphsync.RequestContextCancelledErr)
	require.True(t, ok)
211 212 213 214 215
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
Hannah Howard's avatar
Hannah Howard committed
216
	td := newTestData(managerCtx, t)
217 218
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
219
	peers := testutil.GeneratePeers(1)
220

Hannah Howard's avatar
Hannah Howard committed
221
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
222

Hannah Howard's avatar
Hannah Howard committed
223
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
224

Hannah Howard's avatar
Hannah Howard committed
225
	firstBlocks := td.blockChain.Blocks(0, 3)
226
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
227
	firstResponses := []gsmsg.GraphSyncResponse{
228
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
229
	}
Hannah Howard's avatar
Hannah Howard committed
230
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
231
	td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
232
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
233 234
	managerCancel()

Hannah Howard's avatar
Hannah Howard committed
235
	moreBlocks := td.blockChain.RemainderBlocks(3)
236
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
237
	moreResponses := []gsmsg.GraphSyncResponse{
238
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
239
	}
Hannah Howard's avatar
Hannah Howard committed
240
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
241
	td.fal.SuccessResponseOn(rr.gsr.ID(), moreBlocks)
242
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
243
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
244 245
}

246 247
func TestFailedRequest(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
248
	td := newTestData(ctx, t)
249 250
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
251
	peers := testutil.GeneratePeers(1)
252

Hannah Howard's avatar
Hannah Howard committed
253
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
254

Hannah Howard's avatar
Hannah Howard committed
255
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
256
	failedResponses := []gsmsg.GraphSyncResponse{
257
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
258
	}
Hannah Howard's avatar
Hannah Howard committed
259
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
260

261 262
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
263 264 265 266
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
267
	td := newTestData(ctx, t)
268 269 270 271 272

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
273
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
274

Hannah Howard's avatar
Hannah Howard committed
275
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
276 277

	// async loaded response responds immediately
278
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
279

Hannah Howard's avatar
Hannah Howard committed
280
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
281 282 283

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
284
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
285 286
	}

Hannah Howard's avatar
Hannah Howard committed
287
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
288
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
289 290 291 292 293

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
294
	td := newTestData(ctx, t)
295 296 297 298 299

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

300 301 302 303
	called := make(chan struct{})
	td.responseHooks.Register(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
		close(called)
	})
Hannah Howard's avatar
Hannah Howard committed
304
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
305

Hannah Howard's avatar
Hannah Howard committed
306
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
307 308

	// async loaded response responds immediately
309
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
310

Hannah Howard's avatar
Hannah Howard committed
311
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
312

Hannah Howard's avatar
Hannah Howard committed
313
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), true)
314
	firstResponses := []gsmsg.GraphSyncResponse{
315
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
316
	}
Hannah Howard's avatar
Hannah Howard committed
317
	td.requestManager.ProcessResponses(peers[0], firstResponses, td.blockChain.AllBlocks())
318

319
	td.fal.VerifyNoRemainingData(t, rr.gsr.ID())
320
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
321
	testutil.AssertDoesReceive(requestCtx, t, called, "response hooks called for response")
322 323 324 325
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
326
	td := newTestData(ctx, t)
327 328 329 330 331

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
332
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
333

Hannah Howard's avatar
Hannah Howard committed
334
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
335

Hannah Howard's avatar
Hannah Howard committed
336
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), false)
337
	firstResponses := []gsmsg.GraphSyncResponse{
338
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
339
	}
Hannah Howard's avatar
Hannah Howard committed
340 341
	td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
	for _, block := range td.blockChain.AllBlocks() {
342
		td.fal.ResponseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
343
	}
344 345
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
346
	require.NotEqual(t, len(errs), 0, "did not send errors")
347
}
348 349 350

func TestEncodingExtensions(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
351
	td := newTestData(ctx, t)
352 353 354 355 356

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

357 358
	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
Hannah Howard's avatar
Hannah Howard committed
359 360
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
361
		data, has := responseData.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
362
		require.True(t, has, "did not receive extension data in response")
363
		receivedExtensionData <- data
Hannah Howard's avatar
Hannah Howard committed
364 365 366 367 368 369 370 371
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
372
	}
Hannah Howard's avatar
Hannah Howard committed
373
	td.responseHooks.Register(hook)
374
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
375

Hannah Howard's avatar
Hannah Howard committed
376
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
377 378

	gsr := rr.gsr
379
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
380
	require.True(t, found)
381
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
382

383
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
384
	require.True(t, found)
385
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
386

387 388
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
389
		expectedUpdate := testutil.RandomBytes(100)
390 391 392 393 394 395 396
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
397
					Name: td.extensionName1,
398 399 400 401 402
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
Hannah Howard's avatar
Hannah Howard committed
403 404
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
405
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
406 407 408
				Data: expectedUpdate,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
409
		td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
410 411 412
		var received []byte
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, expectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
413

Hannah Howard's avatar
Hannah Howard committed
414
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
415
		receivedUpdateData, has := rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
416 417 418
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

419
		nextExpectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
420 421
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
422 423 424 425 426 427 428 429

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
430
					Name: td.extensionName1,
431 432 433 434 435
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
Hannah Howard's avatar
Hannah Howard committed
436 437
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
438
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
439 440 441
				Data: nextExpectedUpdate1,
			},
			{
442
				Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
443 444 445
				Data: nextExpectedUpdate2,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
446
		td.requestManager.ProcessResponses(peers[0], secondResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
447 448
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
449

Hannah Howard's avatar
Hannah Howard committed
450
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
451
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
452 453
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
454
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
455 456 457
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

458 459 460
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
461
}
462

Hannah Howard's avatar
Hannah Howard committed
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
func TestBlockHooks(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	receivedBlocks := make(chan graphsync.BlockData, 4)
	receivedResponses := make(chan graphsync.ResponseData, 4)
	expectedError := make(chan error, 4)
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 4)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		receivedBlocks <- blockData
		receivedResponses <- responseData
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
	}
	td.blockHooks.Register(hook)
488
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
Hannah Howard's avatar
Hannah Howard committed
489 490 491 492

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	gsr := rr.gsr
493
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
494
	require.True(t, found)
495
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
Hannah Howard's avatar
Hannah Howard committed
496

497
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
498
	require.True(t, found)
499
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
Hannah Howard's avatar
Hannah Howard committed
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515

	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		expectedUpdate := testutil.RandomBytes(100)

		firstBlocks := td.blockChain.Blocks(0, 3)
		firstMetadata := metadataForBlocks(firstBlocks, true)
		firstMetadataEncoded, err := metadata.EncodeMetadata(firstMetadata)
		require.NoError(t, err, "did not encode metadata")
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: firstMetadataEncoded,
				},
				graphsync.ExtensionData{
516
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
517 518 519 520 521 522 523 524 525 526
					Data: expectedData,
				},
			),
		}
		for i := range firstBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(firstBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
527
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
528 529 530 531 532 533 534 535
						Data: expectedUpdate,
					},
				}
			}
			expectedUpdateChan <- update
		}

		td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
536 537
		td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
538 539
			rr.gsr.ID(): firstMetadata,
		})
540
		td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
541 542

		ur := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
543
		receivedUpdateData, has := ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
544 545 546 547 548 549 550 551 552 553 554
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range firstBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, firstResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, firstResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, firstMetadataEncoded, metadata, "should receive correct metadata")
555
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
			require.Equal(t, expectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		nextExpectedData := testutil.RandomBytes(100)
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
		nextBlocks := td.blockChain.RemainderBlocks(3)
		nextMetadata := metadataForBlocks(nextBlocks, true)
		nextMetadataEncoded, err := metadata.EncodeMetadata(nextMetadata)
		require.NoError(t, err)
		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.RequestCompletedFull, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nextMetadataEncoded,
				},
				graphsync.ExtensionData{
577
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
578 579 580 581 582 583 584 585 586 587
					Data: nextExpectedData,
				},
			),
		}
		for i := range nextBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(nextBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
588
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
589 590 591
						Data: nextExpectedUpdate1,
					},
					{
592
						Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
593 594 595 596 597 598 599
						Data: nextExpectedUpdate2,
					},
				}
			}
			expectedUpdateChan <- update
		}
		td.requestManager.ProcessResponses(peers[0], secondResponses, nextBlocks)
600 601
		td.fal.VerifyLastProcessedBlocks(ctx, t, nextBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
602 603
			rr.gsr.ID(): nextMetadata,
		})
604
		td.fal.SuccessResponseOn(rr.gsr.ID(), nextBlocks)
Hannah Howard's avatar
Hannah Howard committed
605 606

		ur = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
607
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
608 609
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
610
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
611 612 613 614 615 616 617 618 619 620 621
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range nextBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, secondResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, secondResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, nextMetadataEncoded, metadata, "should receive correct metadata")
622
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
623 624 625 626 627 628 629 630 631 632 633 634
			require.Equal(t, nextExpectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
		td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
	})
}

635 636
func TestOutgoingRequestHooks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
637
	td := newTestData(ctx, t)
638 639 640 641 642 643

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	hook := func(p peer.ID, r graphsync.RequestData, ha graphsync.OutgoingRequestHookActions) {
644
		_, has := r.Extension(td.extensionName1)
645
		if has {
Eric Myhre's avatar
Eric Myhre committed
646
			ha.UseLinkTargetNodePrototypeChooser(td.blockChain.Chooser)
647 648 649
			ha.UsePersistenceOption("chainstore")
		}
	}
Hannah Howard's avatar
Hannah Howard committed
650
	td.requestHooks.Register(hook)
651

652
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1)
Hannah Howard's avatar
Hannah Howard committed
653
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
654

Hannah Howard's avatar
Hannah Howard committed
655
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
656

Hannah Howard's avatar
Hannah Howard committed
657 658 659 660 661 662
	dedupData, has := requestRecords[0].gsr.Extension(graphsync.ExtensionDeDupByKey)
	require.True(t, has)
	key, err := dedupkey.DecodeDedupKey(dedupData)
	require.NoError(t, err)
	require.Equal(t, "chainstore", key)

Hannah Howard's avatar
Hannah Howard committed
663
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
664 665 666 667 668 669 670 671 672 673
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	mdExt := graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
		Data: mdEncoded,
	}
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
	}
Hannah Howard's avatar
Hannah Howard committed
674
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
675 676
	td.fal.VerifyLastProcessedBlocks(ctx, t, td.blockChain.AllBlocks())
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
677 678 679
		requestRecords[0].gsr.ID(): md,
		requestRecords[1].gsr.ID(): md,
	})
680 681
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), td.blockChain.AllBlocks())
682

Hannah Howard's avatar
Hannah Howard committed
683 684
	td.blockChain.VerifyWholeChainWithTypes(requestCtx, returnedResponseChan1)
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
685 686
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan2)
687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858
	td.fal.VerifyStoreUsed(t, requestRecords[0].gsr.ID(), "chainstore")
	td.fal.VerifyStoreUsed(t, requestRecords[1].gsr.ID(), "")
}

func TestPauseResume(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForResumeAttempt := make(chan struct{})
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt-1 {
			<-holdForResumeAttempt
		}
		if blocksReceived == pauseAt {
			hookActions.PauseRequest()
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// attempt to unpause while request is not paused (note: hook on second block will keep it from
	// reaching pause point)
	err = td.requestManager.UnpauseRequest(rr.gsr.ID())
	require.EqualError(t, err, "request is not paused")
	close(holdForResumeAttempt)
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
}
func TestPauseResumeExternal(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt {
			err := td.requestManager.PauseRequest(responseData.RequestID())
			require.NoError(t, err)
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
859 860 861
}

type testData struct {
862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879
	requestRecordChan     chan requestRecord
	fph                   *fakePeerHandler
	fal                   *testloader.FakeAsyncLoader
	requestHooks          *hooks.OutgoingRequestHooks
	responseHooks         *hooks.IncomingResponseHooks
	blockHooks            *hooks.IncomingBlockHooks
	requestManager        *RequestManager
	blockStore            map[ipld.Link][]byte
	loader                ipld.Loader
	storer                ipld.Storer
	blockChain            *testutil.TestBlockChain
	extensionName1        graphsync.ExtensionName
	extensionData1        []byte
	extension1            graphsync.ExtensionData
	extensionName2        graphsync.ExtensionName
	extensionData2        []byte
	extension2            graphsync.ExtensionData
	networkErrorListeners *listeners.NetworkErrorListeners
Hannah Howard's avatar
Hannah Howard committed
880 881 882 883 884 885
}

func newTestData(ctx context.Context, t *testing.T) *testData {
	td := &testData{}
	td.requestRecordChan = make(chan requestRecord, 3)
	td.fph = &fakePeerHandler{td.requestRecordChan}
886
	td.fal = testloader.NewFakeAsyncLoader()
Hannah Howard's avatar
Hannah Howard committed
887 888 889
	td.requestHooks = hooks.NewRequestHooks()
	td.responseHooks = hooks.NewResponseHooks()
	td.blockHooks = hooks.NewBlockHooks()
890 891
	td.networkErrorListeners = listeners.NewNetworkErrorListeners()
	td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks, td.networkErrorListeners)
Hannah Howard's avatar
Hannah Howard committed
892 893 894 895 896
	td.requestManager.SetDelegate(td.fph)
	td.requestManager.Startup()
	td.blockStore = make(map[ipld.Link][]byte)
	td.loader, td.storer = testutil.NewTestStore(td.blockStore)
	td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
897 898 899 900 901 902 903 904 905 906 907 908
	td.extensionData1 = testutil.RandomBytes(100)
	td.extensionName1 = graphsync.ExtensionName("AppleSauce/McGee")
	td.extension1 = graphsync.ExtensionData{
		Name: td.extensionName1,
		Data: td.extensionData1,
	}
	td.extensionData2 = testutil.RandomBytes(100)
	td.extensionName2 = graphsync.ExtensionName("HappyLand/Happenstance")
	td.extension2 = graphsync.ExtensionData{
		Name: td.extensionName2,
		Data: td.extensionData2,
	}
Hannah Howard's avatar
Hannah Howard committed
909
	return td
910
}