requestmanager_test.go 35.2 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
7 8 9
	"testing"
	"time"

Hannah Howard's avatar
Hannah Howard committed
10 11 12
	blocks "github.com/ipfs/go-block-format"
	"github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
13
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
14
	"github.com/stretchr/testify/require"
15

Hannah Howard's avatar
Hannah Howard committed
16 17
	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/cidset"
Hannah Howard's avatar
Hannah Howard committed
18
	"github.com/ipfs/go-graphsync/dedupkey"
19
	gsmsg "github.com/ipfs/go-graphsync/message"
Hannah Howard's avatar
Hannah Howard committed
20 21 22 23
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/hooks"
	"github.com/ipfs/go-graphsync/requestmanager/testloader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
24 25 26 27
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
28 29
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
30
}
31

32 33 34 35
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

36 37
func (fph *fakePeerHandler) SendRequest(p peer.ID,
	graphSyncRequest gsmsg.GraphSyncRequest) {
38
	fph.requestRecordChan <- requestRecord{
39 40
		gsr: graphSyncRequest,
		p:   p,
41 42 43
	}
}

44 45 46 47 48 49
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
Hannah Howard's avatar
Hannah Howard committed
50 51 52
		var rr requestRecord
		testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
		requestRecords = append(requestRecords, rr)
53 54 55 56
	}
	return requestRecords
}

57 58 59 60
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
61
			Link:         block.Cid(),
62 63 64 65 66 67
			BlockPresent: present,
		})
	}
	return md
}

68
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
69
	md := metadataForBlocks(blks, present)
70
	metadataEncoded, err := metadata.EncodeMetadata(md)
Hannah Howard's avatar
Hannah Howard committed
71
	require.NoError(t, err, "did not encode metadata")
72 73
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
74 75
		Data: metadataEncoded,
	}
76 77
}

78 79
func TestNormalSimultaneousFetch(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
80
	td := newTestData(ctx, t)
81 82 83

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
84
	peers := testutil.GeneratePeers(1)
85

Hannah Howard's avatar
Hannah Howard committed
86
	blockChain2 := testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
87

Hannah Howard's avatar
Hannah Howard committed
88 89
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
90

Hannah Howard's avatar
Hannah Howard committed
91
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
92

Hannah Howard's avatar
Hannah Howard committed
93 94 95 96
	require.Equal(t, peers[0], requestRecords[0].p)
	require.Equal(t, peers[0], requestRecords[1].p)
	require.False(t, requestRecords[0].gsr.IsCancel())
	require.False(t, requestRecords[1].gsr.IsCancel())
97 98
	require.Equal(t, defaultPriority, requestRecords[0].gsr.Priority())
	require.Equal(t, defaultPriority, requestRecords[1].gsr.Priority())
99

Hannah Howard's avatar
Hannah Howard committed
100
	require.Equal(t, td.blockChain.Selector(), requestRecords[0].gsr.Selector(), "did not encode selector properly")
Hannah Howard's avatar
Hannah Howard committed
101
	require.Equal(t, blockChain2.Selector(), requestRecords[1].gsr.Selector(), "did not encode selector properly")
102

Hannah Howard's avatar
Hannah Howard committed
103 104
	firstBlocks := append(td.blockChain.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(td.blockChain.AllBlocks(), true)
105
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
Hannah Howard's avatar
Hannah Howard committed
106
	require.NoError(t, err, "did not encode metadata")
107
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
108
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
Hannah Howard's avatar
Hannah Howard committed
109
	require.NoError(t, err, "did not encode metadata")
110
	firstResponses := []gsmsg.GraphSyncResponse{
111 112
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
113 114
			Data: firstMetadataEncoded1,
		}),
115 116
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
117 118
			Data: firstMetadataEncoded2,
		}),
119 120
	}

Hannah Howard's avatar
Hannah Howard committed
121
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
122 123
	td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
124 125 126
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
127 128
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
129

Hannah Howard's avatar
Hannah Howard committed
130
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan1)
131
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
132

133
	moreBlocks := blockChain2.RemainderBlocks(3)
134
	moreMetadata := metadataForBlocks(moreBlocks, true)
135
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
Hannah Howard's avatar
Hannah Howard committed
136
	require.NoError(t, err, "did not encode metadata")
137
	moreResponses := []gsmsg.GraphSyncResponse{
138 139
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
140 141
			Data: moreMetadataEncoded,
		}),
142 143
	}

Hannah Howard's avatar
Hannah Howard committed
144
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
145 146
	td.fal.VerifyLastProcessedBlocks(ctx, t, moreBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
147 148
		requestRecords[1].gsr.ID(): moreMetadata,
	})
149

150
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
151

152
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
153 154
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
155 156 157 158
}

func TestCancelRequestInProgress(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
159
	td := newTestData(ctx, t)
160 161 162 163 164
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
165
	peers := testutil.GeneratePeers(1)
166

Hannah Howard's avatar
Hannah Howard committed
167 168
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
169

Hannah Howard's avatar
Hannah Howard committed
170
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
171

Hannah Howard's avatar
Hannah Howard committed
172
	firstBlocks := td.blockChain.Blocks(0, 3)
173
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
174
	firstResponses := []gsmsg.GraphSyncResponse{
175 176
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
177 178
	}

Hannah Howard's avatar
Hannah Howard committed
179
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
180

181 182
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
183
	td.blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
184
	cancel1()
Hannah Howard's avatar
Hannah Howard committed
185
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
Hannah Howard's avatar
Hannah Howard committed
186 187 188

	require.True(t, rr.gsr.IsCancel())
	require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
189

Hannah Howard's avatar
Hannah Howard committed
190
	moreBlocks := td.blockChain.RemainderBlocks(3)
191
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
192
	moreResponses := []gsmsg.GraphSyncResponse{
193 194
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
195
	}
Hannah Howard's avatar
Hannah Howard committed
196
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
197 198
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
199

200
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
Hannah Howard's avatar
Hannah Howard committed
201
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
202

203
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
204 205 206 207 208

	errors := testutil.CollectErrors(requestCtx, t, returnedErrorChan1)
	require.Len(t, errors, 1)
	_, ok := errors[0].(graphsync.RequestContextCancelledErr)
	require.True(t, ok)
209 210 211 212 213
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
Hannah Howard's avatar
Hannah Howard committed
214
	td := newTestData(managerCtx, t)
215 216
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
217
	peers := testutil.GeneratePeers(1)
218

Hannah Howard's avatar
Hannah Howard committed
219
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
220

Hannah Howard's avatar
Hannah Howard committed
221
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
222

Hannah Howard's avatar
Hannah Howard committed
223
	firstBlocks := td.blockChain.Blocks(0, 3)
224
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
225
	firstResponses := []gsmsg.GraphSyncResponse{
226
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
227
	}
Hannah Howard's avatar
Hannah Howard committed
228
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
229
	td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
230
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
231 232
	managerCancel()

Hannah Howard's avatar
Hannah Howard committed
233
	moreBlocks := td.blockChain.RemainderBlocks(3)
234
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
235
	moreResponses := []gsmsg.GraphSyncResponse{
236
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
237
	}
Hannah Howard's avatar
Hannah Howard committed
238
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
239
	td.fal.SuccessResponseOn(rr.gsr.ID(), moreBlocks)
240
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
241
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
242 243
}

244 245
func TestFailedRequest(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
246
	td := newTestData(ctx, t)
247 248
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
249
	peers := testutil.GeneratePeers(1)
250

Hannah Howard's avatar
Hannah Howard committed
251
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
252

Hannah Howard's avatar
Hannah Howard committed
253
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
254
	failedResponses := []gsmsg.GraphSyncResponse{
255
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
256
	}
Hannah Howard's avatar
Hannah Howard committed
257
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
258

259 260
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
261 262 263 264
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
265
	td := newTestData(ctx, t)
266 267 268 269 270

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
271
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
272

Hannah Howard's avatar
Hannah Howard committed
273
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
274 275

	// async loaded response responds immediately
276
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
277

Hannah Howard's avatar
Hannah Howard committed
278
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
279 280 281

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
282
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
283 284
	}

Hannah Howard's avatar
Hannah Howard committed
285
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
286
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
287 288 289 290 291

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
292
	td := newTestData(ctx, t)
293 294 295 296 297

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

298 299 300 301
	called := make(chan struct{})
	td.responseHooks.Register(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
		close(called)
	})
Hannah Howard's avatar
Hannah Howard committed
302
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
303

Hannah Howard's avatar
Hannah Howard committed
304
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
305 306

	// async loaded response responds immediately
307
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
308

Hannah Howard's avatar
Hannah Howard committed
309
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
310

Hannah Howard's avatar
Hannah Howard committed
311
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), true)
312
	firstResponses := []gsmsg.GraphSyncResponse{
313
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
314
	}
Hannah Howard's avatar
Hannah Howard committed
315
	td.requestManager.ProcessResponses(peers[0], firstResponses, td.blockChain.AllBlocks())
316

317
	td.fal.VerifyNoRemainingData(t, rr.gsr.ID())
318
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
319
	testutil.AssertDoesReceive(requestCtx, t, called, "response hooks called for response")
320 321 322 323
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
324
	td := newTestData(ctx, t)
325 326 327 328 329

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
330
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
331

Hannah Howard's avatar
Hannah Howard committed
332
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
333

Hannah Howard's avatar
Hannah Howard committed
334
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), false)
335
	firstResponses := []gsmsg.GraphSyncResponse{
336
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
337
	}
Hannah Howard's avatar
Hannah Howard committed
338 339
	td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
	for _, block := range td.blockChain.AllBlocks() {
340
		td.fal.ResponseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
341
	}
342 343
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
344
	require.NotEqual(t, len(errs), 0, "did not send errors")
345
}
346 347 348

func TestEncodingExtensions(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
349
	td := newTestData(ctx, t)
350 351 352 353 354

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

355 356
	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
Hannah Howard's avatar
Hannah Howard committed
357 358
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
359
		data, has := responseData.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
360
		require.True(t, has, "did not receive extension data in response")
361
		receivedExtensionData <- data
Hannah Howard's avatar
Hannah Howard committed
362 363 364 365 366 367 368 369
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
370
	}
Hannah Howard's avatar
Hannah Howard committed
371
	td.responseHooks.Register(hook)
372
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
373

Hannah Howard's avatar
Hannah Howard committed
374
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
375 376

	gsr := rr.gsr
377
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
378
	require.True(t, found)
379
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
380

381
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
382
	require.True(t, found)
383
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
384

385 386
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
387
		expectedUpdate := testutil.RandomBytes(100)
388 389 390 391 392 393 394
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
395
					Name: td.extensionName1,
396 397 398 399 400
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
Hannah Howard's avatar
Hannah Howard committed
401 402
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
403
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
404 405 406
				Data: expectedUpdate,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
407
		td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
408 409 410
		var received []byte
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, expectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
411

Hannah Howard's avatar
Hannah Howard committed
412
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
413
		receivedUpdateData, has := rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
414 415 416
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

417
		nextExpectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
418 419
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
420 421 422 423 424 425 426 427

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
428
					Name: td.extensionName1,
429 430 431 432 433
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
Hannah Howard's avatar
Hannah Howard committed
434 435
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
436
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
437 438 439
				Data: nextExpectedUpdate1,
			},
			{
440
				Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
441 442 443
				Data: nextExpectedUpdate2,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
444
		td.requestManager.ProcessResponses(peers[0], secondResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
445 446
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
447

Hannah Howard's avatar
Hannah Howard committed
448
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
449
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
450 451
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
452
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
453 454 455
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

456 457 458
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
459
}
460

Hannah Howard's avatar
Hannah Howard committed
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
func TestBlockHooks(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	receivedBlocks := make(chan graphsync.BlockData, 4)
	receivedResponses := make(chan graphsync.ResponseData, 4)
	expectedError := make(chan error, 4)
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 4)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		receivedBlocks <- blockData
		receivedResponses <- responseData
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
	}
	td.blockHooks.Register(hook)
486
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
Hannah Howard's avatar
Hannah Howard committed
487 488 489 490

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	gsr := rr.gsr
491
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
492
	require.True(t, found)
493
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
Hannah Howard's avatar
Hannah Howard committed
494

495
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
496
	require.True(t, found)
497
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
Hannah Howard's avatar
Hannah Howard committed
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513

	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		expectedUpdate := testutil.RandomBytes(100)

		firstBlocks := td.blockChain.Blocks(0, 3)
		firstMetadata := metadataForBlocks(firstBlocks, true)
		firstMetadataEncoded, err := metadata.EncodeMetadata(firstMetadata)
		require.NoError(t, err, "did not encode metadata")
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: firstMetadataEncoded,
				},
				graphsync.ExtensionData{
514
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
515 516 517 518 519 520 521 522 523 524
					Data: expectedData,
				},
			),
		}
		for i := range firstBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(firstBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
525
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
526 527 528 529 530 531 532 533
						Data: expectedUpdate,
					},
				}
			}
			expectedUpdateChan <- update
		}

		td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
534 535
		td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
536 537
			rr.gsr.ID(): firstMetadata,
		})
538
		td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
539 540

		ur := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
541
		receivedUpdateData, has := ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
542 543 544 545 546 547 548 549 550 551 552
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range firstBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, firstResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, firstResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, firstMetadataEncoded, metadata, "should receive correct metadata")
553
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
			require.Equal(t, expectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		nextExpectedData := testutil.RandomBytes(100)
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
		nextBlocks := td.blockChain.RemainderBlocks(3)
		nextMetadata := metadataForBlocks(nextBlocks, true)
		nextMetadataEncoded, err := metadata.EncodeMetadata(nextMetadata)
		require.NoError(t, err)
		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.RequestCompletedFull, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nextMetadataEncoded,
				},
				graphsync.ExtensionData{
575
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
576 577 578 579 580 581 582 583 584 585
					Data: nextExpectedData,
				},
			),
		}
		for i := range nextBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(nextBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
586
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
587 588 589
						Data: nextExpectedUpdate1,
					},
					{
590
						Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
591 592 593 594 595 596 597
						Data: nextExpectedUpdate2,
					},
				}
			}
			expectedUpdateChan <- update
		}
		td.requestManager.ProcessResponses(peers[0], secondResponses, nextBlocks)
598 599
		td.fal.VerifyLastProcessedBlocks(ctx, t, nextBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
600 601
			rr.gsr.ID(): nextMetadata,
		})
602
		td.fal.SuccessResponseOn(rr.gsr.ID(), nextBlocks)
Hannah Howard's avatar
Hannah Howard committed
603 604

		ur = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
605
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
606 607
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
608
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
609 610 611 612 613 614 615 616 617 618 619
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range nextBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, secondResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, secondResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, nextMetadataEncoded, metadata, "should receive correct metadata")
620
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
621 622 623 624 625 626 627 628 629 630 631 632
			require.Equal(t, nextExpectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
		td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
	})
}

633 634
func TestOutgoingRequestHooks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
635
	td := newTestData(ctx, t)
636 637 638 639 640 641

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	hook := func(p peer.ID, r graphsync.RequestData, ha graphsync.OutgoingRequestHookActions) {
642
		_, has := r.Extension(td.extensionName1)
643
		if has {
Eric Myhre's avatar
Eric Myhre committed
644
			ha.UseLinkTargetNodePrototypeChooser(td.blockChain.Chooser)
645 646 647
			ha.UsePersistenceOption("chainstore")
		}
	}
Hannah Howard's avatar
Hannah Howard committed
648
	td.requestHooks.Register(hook)
649

650
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1)
Hannah Howard's avatar
Hannah Howard committed
651
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
652

Hannah Howard's avatar
Hannah Howard committed
653
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
654

Hannah Howard's avatar
Hannah Howard committed
655 656 657 658 659 660
	dedupData, has := requestRecords[0].gsr.Extension(graphsync.ExtensionDeDupByKey)
	require.True(t, has)
	key, err := dedupkey.DecodeDedupKey(dedupData)
	require.NoError(t, err)
	require.Equal(t, "chainstore", key)

Hannah Howard's avatar
Hannah Howard committed
661
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
662 663 664 665 666 667 668 669 670 671
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	mdExt := graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
		Data: mdEncoded,
	}
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
	}
Hannah Howard's avatar
Hannah Howard committed
672
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
673 674
	td.fal.VerifyLastProcessedBlocks(ctx, t, td.blockChain.AllBlocks())
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
675 676 677
		requestRecords[0].gsr.ID(): md,
		requestRecords[1].gsr.ID(): md,
	})
678 679
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), td.blockChain.AllBlocks())
680

Hannah Howard's avatar
Hannah Howard committed
681 682
	td.blockChain.VerifyWholeChainWithTypes(requestCtx, returnedResponseChan1)
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
683 684
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan2)
685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856
	td.fal.VerifyStoreUsed(t, requestRecords[0].gsr.ID(), "chainstore")
	td.fal.VerifyStoreUsed(t, requestRecords[1].gsr.ID(), "")
}

func TestPauseResume(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForResumeAttempt := make(chan struct{})
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt-1 {
			<-holdForResumeAttempt
		}
		if blocksReceived == pauseAt {
			hookActions.PauseRequest()
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// attempt to unpause while request is not paused (note: hook on second block will keep it from
	// reaching pause point)
	err = td.requestManager.UnpauseRequest(rr.gsr.ID())
	require.EqualError(t, err, "request is not paused")
	close(holdForResumeAttempt)
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
}
func TestPauseResumeExternal(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt {
			err := td.requestManager.PauseRequest(responseData.RequestID())
			require.NoError(t, err)
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
857 858 859 860 861
}

type testData struct {
	requestRecordChan chan requestRecord
	fph               *fakePeerHandler
862
	fal               *testloader.FakeAsyncLoader
Hannah Howard's avatar
Hannah Howard committed
863 864 865 866 867 868 869 870
	requestHooks      *hooks.OutgoingRequestHooks
	responseHooks     *hooks.IncomingResponseHooks
	blockHooks        *hooks.IncomingBlockHooks
	requestManager    *RequestManager
	blockStore        map[ipld.Link][]byte
	loader            ipld.Loader
	storer            ipld.Storer
	blockChain        *testutil.TestBlockChain
871 872 873 874 875 876
	extensionName1    graphsync.ExtensionName
	extensionData1    []byte
	extension1        graphsync.ExtensionData
	extensionName2    graphsync.ExtensionName
	extensionData2    []byte
	extension2        graphsync.ExtensionData
Hannah Howard's avatar
Hannah Howard committed
877 878 879 880 881 882
}

func newTestData(ctx context.Context, t *testing.T) *testData {
	td := &testData{}
	td.requestRecordChan = make(chan requestRecord, 3)
	td.fph = &fakePeerHandler{td.requestRecordChan}
883
	td.fal = testloader.NewFakeAsyncLoader()
Hannah Howard's avatar
Hannah Howard committed
884 885 886 887 888 889 890 891 892
	td.requestHooks = hooks.NewRequestHooks()
	td.responseHooks = hooks.NewResponseHooks()
	td.blockHooks = hooks.NewBlockHooks()
	td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks)
	td.requestManager.SetDelegate(td.fph)
	td.requestManager.Startup()
	td.blockStore = make(map[ipld.Link][]byte)
	td.loader, td.storer = testutil.NewTestStore(td.blockStore)
	td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
893 894 895 896 897 898 899 900 901 902 903 904
	td.extensionData1 = testutil.RandomBytes(100)
	td.extensionName1 = graphsync.ExtensionName("AppleSauce/McGee")
	td.extension1 = graphsync.ExtensionData{
		Name: td.extensionName1,
		Data: td.extensionData1,
	}
	td.extensionData2 = testutil.RandomBytes(100)
	td.extensionName2 = graphsync.ExtensionName("HappyLand/Happenstance")
	td.extension2 = graphsync.ExtensionData{
		Name: td.extensionName2,
		Data: td.extensionData2,
	}
Hannah Howard's avatar
Hannah Howard committed
905
	return td
906
}