requestmanager_test.go 35.6 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
7 8 9
	"testing"
	"time"

10 11
	"github.com/ipfs/go-graphsync/listeners"

Hannah Howard's avatar
Hannah Howard committed
12 13 14
	blocks "github.com/ipfs/go-block-format"
	"github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
15
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
16
	"github.com/stretchr/testify/require"
17

Hannah Howard's avatar
Hannah Howard committed
18 19
	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/cidset"
Hannah Howard's avatar
Hannah Howard committed
20
	"github.com/ipfs/go-graphsync/dedupkey"
21
	gsmsg "github.com/ipfs/go-graphsync/message"
Hannah Howard's avatar
Hannah Howard committed
22
	"github.com/ipfs/go-graphsync/metadata"
23
	"github.com/ipfs/go-graphsync/notifications"
Hannah Howard's avatar
Hannah Howard committed
24 25 26
	"github.com/ipfs/go-graphsync/requestmanager/hooks"
	"github.com/ipfs/go-graphsync/requestmanager/testloader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
27 28 29 30
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
31 32
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
33
}
34

35 36 37 38
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

39
func (fph *fakePeerHandler) SendRequest(p peer.ID,
40
	graphSyncRequest gsmsg.GraphSyncRequest, notifees ...notifications.Notifee) {
41
	fph.requestRecordChan <- requestRecord{
42 43
		gsr: graphSyncRequest,
		p:   p,
44 45 46
	}
}

47 48 49 50 51 52
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
Hannah Howard's avatar
Hannah Howard committed
53 54 55
		var rr requestRecord
		testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
		requestRecords = append(requestRecords, rr)
56 57 58 59
	}
	return requestRecords
}

60 61 62 63
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
64
			Link:         block.Cid(),
65 66 67 68 69 70
			BlockPresent: present,
		})
	}
	return md
}

71
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
72
	md := metadataForBlocks(blks, present)
73
	metadataEncoded, err := metadata.EncodeMetadata(md)
Hannah Howard's avatar
Hannah Howard committed
74
	require.NoError(t, err, "did not encode metadata")
75 76
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
77 78
		Data: metadataEncoded,
	}
79 80
}

81 82
func TestNormalSimultaneousFetch(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
83
	td := newTestData(ctx, t)
84 85 86

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
87
	peers := testutil.GeneratePeers(1)
88

Hannah Howard's avatar
Hannah Howard committed
89
	blockChain2 := testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
90

Hannah Howard's avatar
Hannah Howard committed
91 92
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
93

Hannah Howard's avatar
Hannah Howard committed
94
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
95

Hannah Howard's avatar
Hannah Howard committed
96 97 98 99
	require.Equal(t, peers[0], requestRecords[0].p)
	require.Equal(t, peers[0], requestRecords[1].p)
	require.False(t, requestRecords[0].gsr.IsCancel())
	require.False(t, requestRecords[1].gsr.IsCancel())
100 101
	require.Equal(t, defaultPriority, requestRecords[0].gsr.Priority())
	require.Equal(t, defaultPriority, requestRecords[1].gsr.Priority())
102

Hannah Howard's avatar
Hannah Howard committed
103
	require.Equal(t, td.blockChain.Selector(), requestRecords[0].gsr.Selector(), "did not encode selector properly")
Hannah Howard's avatar
Hannah Howard committed
104
	require.Equal(t, blockChain2.Selector(), requestRecords[1].gsr.Selector(), "did not encode selector properly")
105

Hannah Howard's avatar
Hannah Howard committed
106 107
	firstBlocks := append(td.blockChain.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(td.blockChain.AllBlocks(), true)
108
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
Hannah Howard's avatar
Hannah Howard committed
109
	require.NoError(t, err, "did not encode metadata")
110
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
111
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
Hannah Howard's avatar
Hannah Howard committed
112
	require.NoError(t, err, "did not encode metadata")
113
	firstResponses := []gsmsg.GraphSyncResponse{
114 115
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
116 117
			Data: firstMetadataEncoded1,
		}),
118 119
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
120 121
			Data: firstMetadataEncoded2,
		}),
122 123
	}

Hannah Howard's avatar
Hannah Howard committed
124
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
125 126
	td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
127 128 129
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
130 131
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
132

Hannah Howard's avatar
Hannah Howard committed
133
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan1)
134
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
135

136
	moreBlocks := blockChain2.RemainderBlocks(3)
137
	moreMetadata := metadataForBlocks(moreBlocks, true)
138
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
Hannah Howard's avatar
Hannah Howard committed
139
	require.NoError(t, err, "did not encode metadata")
140
	moreResponses := []gsmsg.GraphSyncResponse{
141 142
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
143 144
			Data: moreMetadataEncoded,
		}),
145 146
	}

Hannah Howard's avatar
Hannah Howard committed
147
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
148 149
	td.fal.VerifyLastProcessedBlocks(ctx, t, moreBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
150 151
		requestRecords[1].gsr.ID(): moreMetadata,
	})
152

153
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
154

155
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
156 157
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
158 159 160 161
}

func TestCancelRequestInProgress(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
162
	td := newTestData(ctx, t)
163 164 165 166 167
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
168
	peers := testutil.GeneratePeers(1)
169

Hannah Howard's avatar
Hannah Howard committed
170 171
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
172

Hannah Howard's avatar
Hannah Howard committed
173
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
174

Hannah Howard's avatar
Hannah Howard committed
175
	firstBlocks := td.blockChain.Blocks(0, 3)
176
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
177
	firstResponses := []gsmsg.GraphSyncResponse{
178 179
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
180 181
	}

Hannah Howard's avatar
Hannah Howard committed
182
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
183

184 185
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
186
	td.blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
187
	cancel1()
Hannah Howard's avatar
Hannah Howard committed
188
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
Hannah Howard's avatar
Hannah Howard committed
189 190 191

	require.True(t, rr.gsr.IsCancel())
	require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
192

Hannah Howard's avatar
Hannah Howard committed
193
	moreBlocks := td.blockChain.RemainderBlocks(3)
194
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
195
	moreResponses := []gsmsg.GraphSyncResponse{
196 197
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
198
	}
Hannah Howard's avatar
Hannah Howard committed
199
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
200 201
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
202

203
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
Hannah Howard's avatar
Hannah Howard committed
204
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
205

206
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
207 208 209 210 211

	errors := testutil.CollectErrors(requestCtx, t, returnedErrorChan1)
	require.Len(t, errors, 1)
	_, ok := errors[0].(graphsync.RequestContextCancelledErr)
	require.True(t, ok)
212 213 214 215 216
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
Hannah Howard's avatar
Hannah Howard committed
217
	td := newTestData(managerCtx, t)
218 219
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
220
	peers := testutil.GeneratePeers(1)
221

Hannah Howard's avatar
Hannah Howard committed
222
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
223

Hannah Howard's avatar
Hannah Howard committed
224
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
225

Hannah Howard's avatar
Hannah Howard committed
226
	firstBlocks := td.blockChain.Blocks(0, 3)
227
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
228
	firstResponses := []gsmsg.GraphSyncResponse{
229
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
230
	}
Hannah Howard's avatar
Hannah Howard committed
231
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
232
	td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
233
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
234 235
	managerCancel()

Hannah Howard's avatar
Hannah Howard committed
236
	moreBlocks := td.blockChain.RemainderBlocks(3)
237
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
238
	moreResponses := []gsmsg.GraphSyncResponse{
239
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
240
	}
Hannah Howard's avatar
Hannah Howard committed
241
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
242
	td.fal.SuccessResponseOn(rr.gsr.ID(), moreBlocks)
243
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
244
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
245 246
}

247 248
func TestFailedRequest(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
249
	td := newTestData(ctx, t)
250 251
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
252
	peers := testutil.GeneratePeers(1)
253

Hannah Howard's avatar
Hannah Howard committed
254
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
255

Hannah Howard's avatar
Hannah Howard committed
256
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
257
	failedResponses := []gsmsg.GraphSyncResponse{
258
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
259
	}
Hannah Howard's avatar
Hannah Howard committed
260
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
261

262 263
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
264 265 266 267
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
268
	td := newTestData(ctx, t)
269 270 271 272 273

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
274
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
275

Hannah Howard's avatar
Hannah Howard committed
276
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
277 278

	// async loaded response responds immediately
279
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
280

Hannah Howard's avatar
Hannah Howard committed
281
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
282 283 284

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
285
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
286 287
	}

Hannah Howard's avatar
Hannah Howard committed
288
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
289
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
290 291 292 293 294

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
295
	td := newTestData(ctx, t)
296 297 298 299 300

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

301 302 303 304
	called := make(chan struct{})
	td.responseHooks.Register(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
		close(called)
	})
Hannah Howard's avatar
Hannah Howard committed
305
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
306

Hannah Howard's avatar
Hannah Howard committed
307
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
308 309

	// async loaded response responds immediately
310
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
311

Hannah Howard's avatar
Hannah Howard committed
312
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
313

Hannah Howard's avatar
Hannah Howard committed
314
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), true)
315
	firstResponses := []gsmsg.GraphSyncResponse{
316
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
317
	}
Hannah Howard's avatar
Hannah Howard committed
318
	td.requestManager.ProcessResponses(peers[0], firstResponses, td.blockChain.AllBlocks())
319

320
	td.fal.VerifyNoRemainingData(t, rr.gsr.ID())
321
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
322
	testutil.AssertDoesReceive(requestCtx, t, called, "response hooks called for response")
323 324 325 326
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
327
	td := newTestData(ctx, t)
328 329 330 331 332

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
333
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
334

Hannah Howard's avatar
Hannah Howard committed
335
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
336

Hannah Howard's avatar
Hannah Howard committed
337
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), false)
338
	firstResponses := []gsmsg.GraphSyncResponse{
339
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
340
	}
Hannah Howard's avatar
Hannah Howard committed
341 342
	td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
	for _, block := range td.blockChain.AllBlocks() {
343
		td.fal.ResponseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
344
	}
345 346
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
347
	require.NotEqual(t, len(errs), 0, "did not send errors")
348
}
349 350 351

func TestEncodingExtensions(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
352
	td := newTestData(ctx, t)
353 354 355 356 357

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

358 359
	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
Hannah Howard's avatar
Hannah Howard committed
360 361
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
362
		data, has := responseData.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
363
		require.True(t, has, "did not receive extension data in response")
364
		receivedExtensionData <- data
Hannah Howard's avatar
Hannah Howard committed
365 366 367 368 369 370 371 372
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
373
	}
Hannah Howard's avatar
Hannah Howard committed
374
	td.responseHooks.Register(hook)
375
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
376

Hannah Howard's avatar
Hannah Howard committed
377
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
378 379

	gsr := rr.gsr
380
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
381
	require.True(t, found)
382
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
383

384
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
385
	require.True(t, found)
386
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
387

388 389
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
390
		expectedUpdate := testutil.RandomBytes(100)
391 392 393 394 395 396 397
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
398
					Name: td.extensionName1,
399 400 401 402 403
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
Hannah Howard's avatar
Hannah Howard committed
404 405
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
406
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
407 408 409
				Data: expectedUpdate,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
410
		td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
411 412 413
		var received []byte
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, expectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
414

Hannah Howard's avatar
Hannah Howard committed
415
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
416
		receivedUpdateData, has := rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
417 418 419
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

420
		nextExpectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
421 422
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
423 424 425 426 427 428 429 430

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
431
					Name: td.extensionName1,
432 433 434 435 436
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
Hannah Howard's avatar
Hannah Howard committed
437 438
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
439
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
440 441 442
				Data: nextExpectedUpdate1,
			},
			{
443
				Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
444 445 446
				Data: nextExpectedUpdate2,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
447
		td.requestManager.ProcessResponses(peers[0], secondResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
448 449
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
450

Hannah Howard's avatar
Hannah Howard committed
451
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
452
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
453 454
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
455
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
456 457 458
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

459 460 461
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
462
}
463

Hannah Howard's avatar
Hannah Howard committed
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
func TestBlockHooks(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	receivedBlocks := make(chan graphsync.BlockData, 4)
	receivedResponses := make(chan graphsync.ResponseData, 4)
	expectedError := make(chan error, 4)
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 4)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		receivedBlocks <- blockData
		receivedResponses <- responseData
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
	}
	td.blockHooks.Register(hook)
489
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
Hannah Howard's avatar
Hannah Howard committed
490 491 492 493

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	gsr := rr.gsr
494
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
495
	require.True(t, found)
496
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
Hannah Howard's avatar
Hannah Howard committed
497

498
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
499
	require.True(t, found)
500
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
Hannah Howard's avatar
Hannah Howard committed
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516

	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		expectedUpdate := testutil.RandomBytes(100)

		firstBlocks := td.blockChain.Blocks(0, 3)
		firstMetadata := metadataForBlocks(firstBlocks, true)
		firstMetadataEncoded, err := metadata.EncodeMetadata(firstMetadata)
		require.NoError(t, err, "did not encode metadata")
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: firstMetadataEncoded,
				},
				graphsync.ExtensionData{
517
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
518 519 520 521 522 523 524 525 526 527
					Data: expectedData,
				},
			),
		}
		for i := range firstBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(firstBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
528
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
529 530 531 532 533 534 535 536
						Data: expectedUpdate,
					},
				}
			}
			expectedUpdateChan <- update
		}

		td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
537 538
		td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
539 540
			rr.gsr.ID(): firstMetadata,
		})
541
		td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
542 543

		ur := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
544
		receivedUpdateData, has := ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
545 546 547 548 549 550 551 552 553 554 555
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range firstBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, firstResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, firstResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, firstMetadataEncoded, metadata, "should receive correct metadata")
556
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
			require.Equal(t, expectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		nextExpectedData := testutil.RandomBytes(100)
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
		nextBlocks := td.blockChain.RemainderBlocks(3)
		nextMetadata := metadataForBlocks(nextBlocks, true)
		nextMetadataEncoded, err := metadata.EncodeMetadata(nextMetadata)
		require.NoError(t, err)
		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.RequestCompletedFull, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nextMetadataEncoded,
				},
				graphsync.ExtensionData{
578
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
579 580 581 582 583 584 585 586 587 588
					Data: nextExpectedData,
				},
			),
		}
		for i := range nextBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(nextBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
589
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
590 591 592
						Data: nextExpectedUpdate1,
					},
					{
593
						Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
594 595 596 597 598 599 600
						Data: nextExpectedUpdate2,
					},
				}
			}
			expectedUpdateChan <- update
		}
		td.requestManager.ProcessResponses(peers[0], secondResponses, nextBlocks)
601 602
		td.fal.VerifyLastProcessedBlocks(ctx, t, nextBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
603 604
			rr.gsr.ID(): nextMetadata,
		})
605
		td.fal.SuccessResponseOn(rr.gsr.ID(), nextBlocks)
Hannah Howard's avatar
Hannah Howard committed
606 607

		ur = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
608
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
609 610
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
611
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
612 613 614 615 616 617 618 619 620 621 622
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range nextBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, secondResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, secondResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, nextMetadataEncoded, metadata, "should receive correct metadata")
623
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
624 625 626 627 628 629 630 631 632 633 634 635
			require.Equal(t, nextExpectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
		td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
	})
}

636 637
func TestOutgoingRequestHooks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
638
	td := newTestData(ctx, t)
639 640 641 642 643 644

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	hook := func(p peer.ID, r graphsync.RequestData, ha graphsync.OutgoingRequestHookActions) {
645
		_, has := r.Extension(td.extensionName1)
646
		if has {
Eric Myhre's avatar
Eric Myhre committed
647
			ha.UseLinkTargetNodePrototypeChooser(td.blockChain.Chooser)
648 649 650
			ha.UsePersistenceOption("chainstore")
		}
	}
Hannah Howard's avatar
Hannah Howard committed
651
	td.requestHooks.Register(hook)
652

653
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1)
Hannah Howard's avatar
Hannah Howard committed
654
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
655

Hannah Howard's avatar
Hannah Howard committed
656
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
657

Hannah Howard's avatar
Hannah Howard committed
658 659 660 661 662 663
	dedupData, has := requestRecords[0].gsr.Extension(graphsync.ExtensionDeDupByKey)
	require.True(t, has)
	key, err := dedupkey.DecodeDedupKey(dedupData)
	require.NoError(t, err)
	require.Equal(t, "chainstore", key)

Hannah Howard's avatar
Hannah Howard committed
664
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
665 666 667 668 669 670 671 672 673 674
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	mdExt := graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
		Data: mdEncoded,
	}
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
	}
Hannah Howard's avatar
Hannah Howard committed
675
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
676 677
	td.fal.VerifyLastProcessedBlocks(ctx, t, td.blockChain.AllBlocks())
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
678 679 680
		requestRecords[0].gsr.ID(): md,
		requestRecords[1].gsr.ID(): md,
	})
681 682
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), td.blockChain.AllBlocks())
683

Hannah Howard's avatar
Hannah Howard committed
684 685
	td.blockChain.VerifyWholeChainWithTypes(requestCtx, returnedResponseChan1)
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
686 687
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan2)
688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859
	td.fal.VerifyStoreUsed(t, requestRecords[0].gsr.ID(), "chainstore")
	td.fal.VerifyStoreUsed(t, requestRecords[1].gsr.ID(), "")
}

func TestPauseResume(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForResumeAttempt := make(chan struct{})
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt-1 {
			<-holdForResumeAttempt
		}
		if blocksReceived == pauseAt {
			hookActions.PauseRequest()
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// attempt to unpause while request is not paused (note: hook on second block will keep it from
	// reaching pause point)
	err = td.requestManager.UnpauseRequest(rr.gsr.ID())
	require.EqualError(t, err, "request is not paused")
	close(holdForResumeAttempt)
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
}
func TestPauseResumeExternal(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt {
			err := td.requestManager.PauseRequest(responseData.RequestID())
			require.NoError(t, err)
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
860 861 862
}

type testData struct {
863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880
	requestRecordChan     chan requestRecord
	fph                   *fakePeerHandler
	fal                   *testloader.FakeAsyncLoader
	requestHooks          *hooks.OutgoingRequestHooks
	responseHooks         *hooks.IncomingResponseHooks
	blockHooks            *hooks.IncomingBlockHooks
	requestManager        *RequestManager
	blockStore            map[ipld.Link][]byte
	loader                ipld.Loader
	storer                ipld.Storer
	blockChain            *testutil.TestBlockChain
	extensionName1        graphsync.ExtensionName
	extensionData1        []byte
	extension1            graphsync.ExtensionData
	extensionName2        graphsync.ExtensionName
	extensionData2        []byte
	extension2            graphsync.ExtensionData
	networkErrorListeners *listeners.NetworkErrorListeners
Hannah Howard's avatar
Hannah Howard committed
881 882 883 884 885 886
}

func newTestData(ctx context.Context, t *testing.T) *testData {
	td := &testData{}
	td.requestRecordChan = make(chan requestRecord, 3)
	td.fph = &fakePeerHandler{td.requestRecordChan}
887
	td.fal = testloader.NewFakeAsyncLoader()
Hannah Howard's avatar
Hannah Howard committed
888 889 890
	td.requestHooks = hooks.NewRequestHooks()
	td.responseHooks = hooks.NewResponseHooks()
	td.blockHooks = hooks.NewBlockHooks()
891 892
	td.networkErrorListeners = listeners.NewNetworkErrorListeners()
	td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks, td.networkErrorListeners)
Hannah Howard's avatar
Hannah Howard committed
893 894 895 896 897
	td.requestManager.SetDelegate(td.fph)
	td.requestManager.Startup()
	td.blockStore = make(map[ipld.Link][]byte)
	td.loader, td.storer = testutil.NewTestStore(td.blockStore)
	td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
898 899 900 901 902 903 904 905 906 907 908 909
	td.extensionData1 = testutil.RandomBytes(100)
	td.extensionName1 = graphsync.ExtensionName("AppleSauce/McGee")
	td.extension1 = graphsync.ExtensionData{
		Name: td.extensionName1,
		Data: td.extensionData1,
	}
	td.extensionData2 = testutil.RandomBytes(100)
	td.extensionName2 = graphsync.ExtensionName("HappyLand/Happenstance")
	td.extension2 = graphsync.ExtensionData{
		Name: td.extensionName2,
		Data: td.extensionData2,
	}
Hannah Howard's avatar
Hannah Howard committed
910
	return td
911
}