requestmanager_test.go 35.7 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
7 8 9
	"testing"
	"time"

Hannah Howard's avatar
Hannah Howard committed
10 11 12
	blocks "github.com/ipfs/go-block-format"
	"github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
13
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
14
	"github.com/stretchr/testify/require"
15

Hannah Howard's avatar
Hannah Howard committed
16 17
	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/cidset"
Hannah Howard's avatar
Hannah Howard committed
18
	"github.com/ipfs/go-graphsync/dedupkey"
19
	"github.com/ipfs/go-graphsync/listeners"
20
	gsmsg "github.com/ipfs/go-graphsync/message"
Hannah Howard's avatar
Hannah Howard committed
21
	"github.com/ipfs/go-graphsync/metadata"
22
	"github.com/ipfs/go-graphsync/notifications"
Hannah Howard's avatar
Hannah Howard committed
23 24 25
	"github.com/ipfs/go-graphsync/requestmanager/hooks"
	"github.com/ipfs/go-graphsync/requestmanager/testloader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
26 27 28 29
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
30 31
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
32
}
33

34 35 36 37
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

38 39 40 41 42 43 44 45
func (fph *fakePeerHandler) BuildMessage(p peer.ID, blkSize uint64,
	requestBuilder func(b *gsmsg.Builder), notifees []notifications.Notifee) {
	builder := gsmsg.NewBuilder(gsmsg.Topic(0))
	requestBuilder(builder)
	message, err := builder.Build()
	if err != nil {
		panic(err)
	}
46
	fph.requestRecordChan <- requestRecord{
47
		gsr: message.Requests()[0],
48
		p:   p,
49 50 51
	}
}

52 53 54 55 56 57
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
Hannah Howard's avatar
Hannah Howard committed
58 59 60
		var rr requestRecord
		testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
		requestRecords = append(requestRecords, rr)
61 62 63 64
	}
	return requestRecords
}

65 66 67 68
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
69
			Link:         block.Cid(),
70 71 72 73 74 75
			BlockPresent: present,
		})
	}
	return md
}

76
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
77
	md := metadataForBlocks(blks, present)
78
	metadataEncoded, err := metadata.EncodeMetadata(md)
Hannah Howard's avatar
Hannah Howard committed
79
	require.NoError(t, err, "did not encode metadata")
80 81
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
82 83
		Data: metadataEncoded,
	}
84 85
}

86 87
func TestNormalSimultaneousFetch(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
88
	td := newTestData(ctx, t)
89 90 91

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
92
	peers := testutil.GeneratePeers(1)
93

Hannah Howard's avatar
Hannah Howard committed
94
	blockChain2 := testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
95

Hannah Howard's avatar
Hannah Howard committed
96 97
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
98

Hannah Howard's avatar
Hannah Howard committed
99
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
100

Hannah Howard's avatar
Hannah Howard committed
101 102 103 104
	require.Equal(t, peers[0], requestRecords[0].p)
	require.Equal(t, peers[0], requestRecords[1].p)
	require.False(t, requestRecords[0].gsr.IsCancel())
	require.False(t, requestRecords[1].gsr.IsCancel())
105 106
	require.Equal(t, defaultPriority, requestRecords[0].gsr.Priority())
	require.Equal(t, defaultPriority, requestRecords[1].gsr.Priority())
107

Hannah Howard's avatar
Hannah Howard committed
108
	require.Equal(t, td.blockChain.Selector(), requestRecords[0].gsr.Selector(), "did not encode selector properly")
Hannah Howard's avatar
Hannah Howard committed
109
	require.Equal(t, blockChain2.Selector(), requestRecords[1].gsr.Selector(), "did not encode selector properly")
110

Hannah Howard's avatar
Hannah Howard committed
111 112
	firstBlocks := append(td.blockChain.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(td.blockChain.AllBlocks(), true)
113
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
Hannah Howard's avatar
Hannah Howard committed
114
	require.NoError(t, err, "did not encode metadata")
115
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
116
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
Hannah Howard's avatar
Hannah Howard committed
117
	require.NoError(t, err, "did not encode metadata")
118
	firstResponses := []gsmsg.GraphSyncResponse{
119 120
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
121 122
			Data: firstMetadataEncoded1,
		}),
123 124
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
125 126
			Data: firstMetadataEncoded2,
		}),
127 128
	}

Hannah Howard's avatar
Hannah Howard committed
129
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
130 131
	td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
132 133 134
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
135 136
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
137

Hannah Howard's avatar
Hannah Howard committed
138
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan1)
139
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
140

141
	moreBlocks := blockChain2.RemainderBlocks(3)
142
	moreMetadata := metadataForBlocks(moreBlocks, true)
143
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
Hannah Howard's avatar
Hannah Howard committed
144
	require.NoError(t, err, "did not encode metadata")
145
	moreResponses := []gsmsg.GraphSyncResponse{
146 147
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
148 149
			Data: moreMetadataEncoded,
		}),
150 151
	}

Hannah Howard's avatar
Hannah Howard committed
152
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
153 154
	td.fal.VerifyLastProcessedBlocks(ctx, t, moreBlocks)
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
155 156
		requestRecords[1].gsr.ID(): moreMetadata,
	})
157

158
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
159

160
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
161 162
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
163 164 165 166
}

func TestCancelRequestInProgress(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
167
	td := newTestData(ctx, t)
168 169 170 171 172
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
173
	peers := testutil.GeneratePeers(1)
174

Hannah Howard's avatar
Hannah Howard committed
175 176
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
177

Hannah Howard's avatar
Hannah Howard committed
178
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
179

Hannah Howard's avatar
Hannah Howard committed
180
	firstBlocks := td.blockChain.Blocks(0, 3)
181
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
182
	firstResponses := []gsmsg.GraphSyncResponse{
183 184
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
185 186
	}

Hannah Howard's avatar
Hannah Howard committed
187
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
188

189 190
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
191
	td.blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
192
	cancel1()
Hannah Howard's avatar
Hannah Howard committed
193
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
Hannah Howard's avatar
Hannah Howard committed
194 195 196

	require.True(t, rr.gsr.IsCancel())
	require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
197

Hannah Howard's avatar
Hannah Howard committed
198
	moreBlocks := td.blockChain.RemainderBlocks(3)
199
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
200
	moreResponses := []gsmsg.GraphSyncResponse{
201 202
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
203
	}
Hannah Howard's avatar
Hannah Howard committed
204
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
205 206
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
207

208
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
Hannah Howard's avatar
Hannah Howard committed
209
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
210

211
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
212 213 214 215 216

	errors := testutil.CollectErrors(requestCtx, t, returnedErrorChan1)
	require.Len(t, errors, 1)
	_, ok := errors[0].(graphsync.RequestContextCancelledErr)
	require.True(t, ok)
217 218 219 220 221
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
Hannah Howard's avatar
Hannah Howard committed
222
	td := newTestData(managerCtx, t)
223 224
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
225
	peers := testutil.GeneratePeers(1)
226

Hannah Howard's avatar
Hannah Howard committed
227
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
228

Hannah Howard's avatar
Hannah Howard committed
229
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
230

Hannah Howard's avatar
Hannah Howard committed
231
	firstBlocks := td.blockChain.Blocks(0, 3)
232
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
233
	firstResponses := []gsmsg.GraphSyncResponse{
234
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
235
	}
Hannah Howard's avatar
Hannah Howard committed
236
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
237
	td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
238
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
239 240
	managerCancel()

Hannah Howard's avatar
Hannah Howard committed
241
	moreBlocks := td.blockChain.RemainderBlocks(3)
242
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
243
	moreResponses := []gsmsg.GraphSyncResponse{
244
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
245
	}
Hannah Howard's avatar
Hannah Howard committed
246
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
247
	td.fal.SuccessResponseOn(rr.gsr.ID(), moreBlocks)
248
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
249
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
250 251
}

252 253
func TestFailedRequest(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
254
	td := newTestData(ctx, t)
255 256
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
257
	peers := testutil.GeneratePeers(1)
258

Hannah Howard's avatar
Hannah Howard committed
259
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
260

Hannah Howard's avatar
Hannah Howard committed
261
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
262
	failedResponses := []gsmsg.GraphSyncResponse{
263
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
264
	}
Hannah Howard's avatar
Hannah Howard committed
265
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
266

267 268
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
269 270 271 272
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
273
	td := newTestData(ctx, t)
274 275 276 277 278

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
279
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
280

Hannah Howard's avatar
Hannah Howard committed
281
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
282 283

	// async loaded response responds immediately
284
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
285

Hannah Howard's avatar
Hannah Howard committed
286
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
287 288 289

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
290
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
291 292
	}

Hannah Howard's avatar
Hannah Howard committed
293
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
294
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
295 296 297 298 299

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
300
	td := newTestData(ctx, t)
301 302 303 304 305

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

306 307 308 309
	called := make(chan struct{})
	td.responseHooks.Register(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
		close(called)
	})
Hannah Howard's avatar
Hannah Howard committed
310
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
311

Hannah Howard's avatar
Hannah Howard committed
312
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
313 314

	// async loaded response responds immediately
315
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
316

Hannah Howard's avatar
Hannah Howard committed
317
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
318

Hannah Howard's avatar
Hannah Howard committed
319
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), true)
320
	firstResponses := []gsmsg.GraphSyncResponse{
321
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
322
	}
Hannah Howard's avatar
Hannah Howard committed
323
	td.requestManager.ProcessResponses(peers[0], firstResponses, td.blockChain.AllBlocks())
324

325
	td.fal.VerifyNoRemainingData(t, rr.gsr.ID())
326
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
327
	testutil.AssertDoesReceive(requestCtx, t, called, "response hooks called for response")
328 329 330 331
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
332
	td := newTestData(ctx, t)
333 334 335 336 337

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
338
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
339

Hannah Howard's avatar
Hannah Howard committed
340
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
341

Hannah Howard's avatar
Hannah Howard committed
342
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), false)
343
	firstResponses := []gsmsg.GraphSyncResponse{
344
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
345
	}
Hannah Howard's avatar
Hannah Howard committed
346 347
	td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
	for _, block := range td.blockChain.AllBlocks() {
348
		td.fal.ResponseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
349
	}
350 351
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
352
	require.NotEqual(t, len(errs), 0, "did not send errors")
353
}
354 355 356

func TestEncodingExtensions(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
357
	td := newTestData(ctx, t)
358 359 360 361 362

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

363 364
	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
Hannah Howard's avatar
Hannah Howard committed
365 366
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
367
		data, has := responseData.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
368
		require.True(t, has, "did not receive extension data in response")
369
		receivedExtensionData <- data
Hannah Howard's avatar
Hannah Howard committed
370 371 372 373 374 375 376 377
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
378
	}
Hannah Howard's avatar
Hannah Howard committed
379
	td.responseHooks.Register(hook)
380
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
381

Hannah Howard's avatar
Hannah Howard committed
382
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
383 384

	gsr := rr.gsr
385
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
386
	require.True(t, found)
387
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
388

389
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
390
	require.True(t, found)
391
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
392

393 394
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
395
		expectedUpdate := testutil.RandomBytes(100)
396 397 398 399 400 401 402
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
403
					Name: td.extensionName1,
404 405 406 407 408
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
Hannah Howard's avatar
Hannah Howard committed
409 410
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
411
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
412 413 414
				Data: expectedUpdate,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
415
		td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
416 417 418
		var received []byte
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, expectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
419

Hannah Howard's avatar
Hannah Howard committed
420
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
421
		receivedUpdateData, has := rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
422 423 424
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

425
		nextExpectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
426 427
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
428 429 430 431 432 433 434 435

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
436
					Name: td.extensionName1,
437 438 439 440 441
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
Hannah Howard's avatar
Hannah Howard committed
442 443
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
444
				Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
445 446 447
				Data: nextExpectedUpdate1,
			},
			{
448
				Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
449 450 451
				Data: nextExpectedUpdate2,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
452
		td.requestManager.ProcessResponses(peers[0], secondResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
453 454
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
455

Hannah Howard's avatar
Hannah Howard committed
456
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
457
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
458 459
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
460
		receivedUpdateData, has = rr.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
461 462 463
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

464 465 466
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
467
}
468

Hannah Howard's avatar
Hannah Howard committed
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493
func TestBlockHooks(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	receivedBlocks := make(chan graphsync.BlockData, 4)
	receivedResponses := make(chan graphsync.ResponseData, 4)
	expectedError := make(chan error, 4)
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 4)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		receivedBlocks <- blockData
		receivedResponses <- responseData
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
	}
	td.blockHooks.Register(hook)
494
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
Hannah Howard's avatar
Hannah Howard committed
495 496 497 498

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	gsr := rr.gsr
499
	returnedData1, found := gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
500
	require.True(t, found)
501
	require.Equal(t, td.extensionData1, returnedData1, "did not encode first extension correctly")
Hannah Howard's avatar
Hannah Howard committed
502

503
	returnedData2, found := gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
504
	require.True(t, found)
505
	require.Equal(t, td.extensionData2, returnedData2, "did not encode second extension correctly")
Hannah Howard's avatar
Hannah Howard committed
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521

	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		expectedUpdate := testutil.RandomBytes(100)

		firstBlocks := td.blockChain.Blocks(0, 3)
		firstMetadata := metadataForBlocks(firstBlocks, true)
		firstMetadataEncoded, err := metadata.EncodeMetadata(firstMetadata)
		require.NoError(t, err, "did not encode metadata")
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: firstMetadataEncoded,
				},
				graphsync.ExtensionData{
522
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
523 524 525 526 527 528 529 530 531 532
					Data: expectedData,
				},
			),
		}
		for i := range firstBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(firstBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
533
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
534 535 536 537 538 539 540 541
						Data: expectedUpdate,
					},
				}
			}
			expectedUpdateChan <- update
		}

		td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
542 543
		td.fal.VerifyLastProcessedBlocks(ctx, t, firstBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
544 545
			rr.gsr.ID(): firstMetadata,
		})
546
		td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks)
Hannah Howard's avatar
Hannah Howard committed
547 548

		ur := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
549
		receivedUpdateData, has := ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
550 551 552 553 554 555 556 557 558 559 560
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range firstBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, firstResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, firstResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, firstMetadataEncoded, metadata, "should receive correct metadata")
561
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
			require.Equal(t, expectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		nextExpectedData := testutil.RandomBytes(100)
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
		nextBlocks := td.blockChain.RemainderBlocks(3)
		nextMetadata := metadataForBlocks(nextBlocks, true)
		nextMetadataEncoded, err := metadata.EncodeMetadata(nextMetadata)
		require.NoError(t, err)
		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.RequestCompletedFull, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nextMetadataEncoded,
				},
				graphsync.ExtensionData{
583
					Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
584 585 586 587 588 589 590 591 592 593
					Data: nextExpectedData,
				},
			),
		}
		for i := range nextBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(nextBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
594
						Name: td.extensionName1,
Hannah Howard's avatar
Hannah Howard committed
595 596 597
						Data: nextExpectedUpdate1,
					},
					{
598
						Name: td.extensionName2,
Hannah Howard's avatar
Hannah Howard committed
599 600 601 602 603 604 605
						Data: nextExpectedUpdate2,
					},
				}
			}
			expectedUpdateChan <- update
		}
		td.requestManager.ProcessResponses(peers[0], secondResponses, nextBlocks)
606 607
		td.fal.VerifyLastProcessedBlocks(ctx, t, nextBlocks)
		td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
Hannah Howard's avatar
Hannah Howard committed
608 609
			rr.gsr.ID(): nextMetadata,
		})
610
		td.fal.SuccessResponseOn(rr.gsr.ID(), nextBlocks)
Hannah Howard's avatar
Hannah Howard committed
611 612

		ur = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
613
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
614 615
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
616
		receivedUpdateData, has = ur.gsr.Extension(td.extensionName2)
Hannah Howard's avatar
Hannah Howard committed
617 618 619 620 621 622 623 624 625 626 627
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range nextBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, secondResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, secondResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, nextMetadataEncoded, metadata, "should receive correct metadata")
628
			receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1)
Hannah Howard's avatar
Hannah Howard committed
629 630 631 632 633 634 635 636 637 638 639 640
			require.Equal(t, nextExpectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
		td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
	})
}

641 642
func TestOutgoingRequestHooks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
643
	td := newTestData(ctx, t)
644 645 646 647 648 649

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	hook := func(p peer.ID, r graphsync.RequestData, ha graphsync.OutgoingRequestHookActions) {
650
		_, has := r.Extension(td.extensionName1)
651
		if has {
Eric Myhre's avatar
Eric Myhre committed
652
			ha.UseLinkTargetNodePrototypeChooser(td.blockChain.Chooser)
653 654 655
			ha.UsePersistenceOption("chainstore")
		}
	}
Hannah Howard's avatar
Hannah Howard committed
656
	td.requestHooks.Register(hook)
657

658
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1)
Hannah Howard's avatar
Hannah Howard committed
659
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
660

Hannah Howard's avatar
Hannah Howard committed
661
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
662

Hannah Howard's avatar
Hannah Howard committed
663 664 665 666 667 668
	dedupData, has := requestRecords[0].gsr.Extension(graphsync.ExtensionDeDupByKey)
	require.True(t, has)
	key, err := dedupkey.DecodeDedupKey(dedupData)
	require.NoError(t, err)
	require.Equal(t, "chainstore", key)

Hannah Howard's avatar
Hannah Howard committed
669
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
670 671 672 673 674 675 676 677 678 679
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	mdExt := graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
		Data: mdEncoded,
	}
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
	}
Hannah Howard's avatar
Hannah Howard committed
680
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
681 682
	td.fal.VerifyLastProcessedBlocks(ctx, t, td.blockChain.AllBlocks())
	td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
683 684 685
		requestRecords[0].gsr.ID(): md,
		requestRecords[1].gsr.ID(): md,
	})
686 687
	td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), td.blockChain.AllBlocks())
688

Hannah Howard's avatar
Hannah Howard committed
689 690
	td.blockChain.VerifyWholeChainWithTypes(requestCtx, returnedResponseChan1)
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
691 692
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan2)
693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864
	td.fal.VerifyStoreUsed(t, requestRecords[0].gsr.ID(), "chainstore")
	td.fal.VerifyStoreUsed(t, requestRecords[1].gsr.ID(), "")
}

func TestPauseResume(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForResumeAttempt := make(chan struct{})
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt-1 {
			<-holdForResumeAttempt
		}
		if blocksReceived == pauseAt {
			hookActions.PauseRequest()
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// attempt to unpause while request is not paused (note: hook on second block will keep it from
	// reaching pause point)
	err = td.requestManager.UnpauseRequest(rr.gsr.ID())
	require.EqualError(t, err, "request is not paused")
	close(holdForResumeAttempt)
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
}
func TestPauseResumeExternal(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blocksReceived := 0
	holdForPause := make(chan struct{})
	pauseAt := 3

	// setup hook to pause at 3rd block (and wait on second block for resume while unpaused test)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		blocksReceived++
		if blocksReceived == pauseAt {
			err := td.requestManager.PauseRequest(responseData.RequestID())
			require.NoError(t, err)
			close(holdForPause)
		}
	}
	td.blockHooks.Register(hook)

	// Start request
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	// Start processing responses
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
			Data: mdEncoded,
		}),
	}
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
	// verify responses sent read ONLY for blocks BEFORE the pause
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1)
	// wait for the pause to occur
	<-holdForPause

	// read the outgoing cancel request
	pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	require.True(t, pauseCancel.gsr.IsCancel())

	// verify no further responses come through
	time.Sleep(100 * time.Millisecond)
	testutil.AssertChannelEmpty(t, returnedResponseChan, "no response should be sent request is paused")
	td.fal.CleanupRequest(rr.gsr.ID())

	// unpause
	err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
	require.NoError(t, err)

	// verify the correct new request with Do-no-send-cids & other extensions
	resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
	doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
	doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
	require.NoError(t, err)
	require.Equal(t, pauseAt, doNotSendCids.Len())
	require.True(t, has)
	ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
	require.True(t, has)
	require.Equal(t, td.extensionData1, ext1Data)
	ext2Data, has := resumedRequest.gsr.Extension(td.extensionName2)
	require.True(t, has)
	require.Equal(t, td.extensionData2, ext2Data)

	// process responses
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt))
	td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())

	// verify the correct results are returned, picking up after where there request was paused
	td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
865 866 867
}

type testData struct {
868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885
	requestRecordChan     chan requestRecord
	fph                   *fakePeerHandler
	fal                   *testloader.FakeAsyncLoader
	requestHooks          *hooks.OutgoingRequestHooks
	responseHooks         *hooks.IncomingResponseHooks
	blockHooks            *hooks.IncomingBlockHooks
	requestManager        *RequestManager
	blockStore            map[ipld.Link][]byte
	loader                ipld.Loader
	storer                ipld.Storer
	blockChain            *testutil.TestBlockChain
	extensionName1        graphsync.ExtensionName
	extensionData1        []byte
	extension1            graphsync.ExtensionData
	extensionName2        graphsync.ExtensionName
	extensionData2        []byte
	extension2            graphsync.ExtensionData
	networkErrorListeners *listeners.NetworkErrorListeners
Hannah Howard's avatar
Hannah Howard committed
886 887 888 889 890 891
}

func newTestData(ctx context.Context, t *testing.T) *testData {
	td := &testData{}
	td.requestRecordChan = make(chan requestRecord, 3)
	td.fph = &fakePeerHandler{td.requestRecordChan}
892
	td.fal = testloader.NewFakeAsyncLoader()
Hannah Howard's avatar
Hannah Howard committed
893 894 895
	td.requestHooks = hooks.NewRequestHooks()
	td.responseHooks = hooks.NewResponseHooks()
	td.blockHooks = hooks.NewBlockHooks()
896 897
	td.networkErrorListeners = listeners.NewNetworkErrorListeners()
	td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks, td.networkErrorListeners)
Hannah Howard's avatar
Hannah Howard committed
898 899 900 901 902
	td.requestManager.SetDelegate(td.fph)
	td.requestManager.Startup()
	td.blockStore = make(map[ipld.Link][]byte)
	td.loader, td.storer = testutil.NewTestStore(td.blockStore)
	td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
903 904 905 906 907 908 909 910 911 912 913 914
	td.extensionData1 = testutil.RandomBytes(100)
	td.extensionName1 = graphsync.ExtensionName("AppleSauce/McGee")
	td.extension1 = graphsync.ExtensionData{
		Name: td.extensionName1,
		Data: td.extensionData1,
	}
	td.extensionData2 = testutil.RandomBytes(100)
	td.extensionName2 = graphsync.ExtensionName("HappyLand/Happenstance")
	td.extension2 = graphsync.ExtensionData{
		Name: td.extensionName2,
		Data: td.extensionData2,
	}
Hannah Howard's avatar
Hannah Howard committed
915
	return td
916
}