responsemanager_test.go 22.6 KB
Newer Older
1 2 3 4
package responsemanager

import (
	"context"
5
	"errors"
6 7 8 9 10
	"math/rand"
	"sync"
	"testing"
	"time"

11
	"github.com/ipfs/go-graphsync"
12
	gsmsg "github.com/ipfs/go-graphsync/message"
13
	"github.com/ipfs/go-graphsync/responsemanager/blockhooks"
14
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
15 16
	"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
	"github.com/ipfs/go-graphsync/responsemanager/requesthooks"
17
	"github.com/ipfs/go-graphsync/selectorvalidator"
18
	"github.com/ipfs/go-graphsync/testutil"
19
	"github.com/ipfs/go-peertaskqueue/peertask"
20
	ipld "github.com/ipld/go-ipld-prime"
21
	ipldfree "github.com/ipld/go-ipld-prime/impl/free"
22
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
23
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
24
	"github.com/stretchr/testify/require"
25 26 27 28 29
)

type fakeQueryQueue struct {
	popWait   sync.WaitGroup
	queriesLk sync.RWMutex
30
	queries   []*peertask.QueueTask
31 32
}

33
func (fqq *fakeQueryQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
34
	fqq.queriesLk.Lock()
35 36 37 38 39 40

	// This isn't quite right as the queue should deduplicate requests, but
	// it's good enough.
	for _, task := range tasks {
		fqq.queries = append(fqq.queries, peertask.NewQueueTask(task, to, time.Now()))
	}
41 42 43
	fqq.queriesLk.Unlock()
}

44
func (fqq *fakeQueryQueue) PopTasks(targetWork int) (peer.ID, []*peertask.Task, int) {
45 46 47 48
	fqq.popWait.Wait()
	fqq.queriesLk.Lock()
	defer fqq.queriesLk.Unlock()
	if len(fqq.queries) == 0 {
49
		return "", nil, -1
50
	}
51 52
	// We're not bothering to implement "work"
	task := fqq.queries[0]
53
	fqq.queries = fqq.queries[1:]
54
	return task.Target, []*peertask.Task{&task.Task}, 0
55 56
}

57
func (fqq *fakeQueryQueue) Remove(topic peertask.Topic, p peer.ID) {
58 59 60
	fqq.queriesLk.Lock()
	defer fqq.queriesLk.Unlock()
	for i, query := range fqq.queries {
61 62
		if query.Target == p && query.Topic == topic {
			fqq.queries = append(fqq.queries[:i], fqq.queries[i+1:]...)
63 64 65 66
		}
	}
}

67 68 69 70
func (fqq *fakeQueryQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {
	// We don't track active tasks so this is a no-op
}

71 72 73
func (fqq *fakeQueryQueue) ThawRound() {

}
74 75 76 77 78 79 80 81 82 83 84 85

type fakePeerManager struct {
	lastPeer           peer.ID
	peerResponseSender peerresponsemanager.PeerResponseSender
}

func (fpm *fakePeerManager) SenderForPeer(p peer.ID) peerresponsemanager.PeerResponseSender {
	fpm.lastPeer = p
	return fpm.peerResponseSender
}

type sentResponse struct {
86
	requestID graphsync.RequestID
87 88 89 90
	link      ipld.Link
	data      []byte
}

91 92 93 94 95 96 97 98 99
type sentExtension struct {
	requestID graphsync.RequestID
	extension graphsync.ExtensionData
}

type completedRequest struct {
	requestID graphsync.RequestID
	result    graphsync.ResponseStatusCode
}
100 101
type fakePeerResponseSender struct {
	sentResponses        chan sentResponse
102 103
	sentExtensions       chan sentExtension
	lastCompletedRequest chan completedRequest
104 105 106 107 108
}

func (fprs *fakePeerResponseSender) Startup()  {}
func (fprs *fakePeerResponseSender) Shutdown() {}

109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
type fakeBlkData struct {
	link ipld.Link
	size uint64
}

func (fbd fakeBlkData) Link() ipld.Link {
	return fbd.link
}

func (fbd fakeBlkData) BlockSize() uint64 {
	return fbd.size
}

func (fbd fakeBlkData) BlockSizeOnWire() uint64 {
	return fbd.size
}

126
func (fprs *fakePeerResponseSender) SendResponse(
127
	requestID graphsync.RequestID,
128 129
	link ipld.Link,
	data []byte,
130
) graphsync.BlockData {
131
	fprs.sentResponses <- sentResponse{requestID, link, data}
132
	return fakeBlkData{link, uint64(len(data))}
133 134
}

135 136 137 138 139 140 141
func (fprs *fakePeerResponseSender) SendExtensionData(
	requestID graphsync.RequestID,
	extension graphsync.ExtensionData,
) {
	fprs.sentExtensions <- sentExtension{requestID, extension}
}

142
func (fprs *fakePeerResponseSender) FinishRequest(requestID graphsync.RequestID) {
143
	fprs.lastCompletedRequest <- completedRequest{requestID, graphsync.RequestCompletedFull}
144 145
}

146
func (fprs *fakePeerResponseSender) FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
147
	fprs.lastCompletedRequest <- completedRequest{requestID, status}
148 149 150
}

func TestIncomingQuery(t *testing.T) {
151 152 153 154 155 156
	td := newTestData(t)
	defer td.cancel()
	blks := td.blockChain.AllBlocks()

	responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
	td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
157 158
	responseManager.Startup()

159 160
	responseManager.ProcessRequests(td.ctx, td.p, td.requests)
	testutil.AssertDoesReceive(td.ctx, t, td.completedRequestChan, "Should have completed request but didn't")
161
	for i := 0; i < len(blks); i++ {
Hannah Howard's avatar
Hannah Howard committed
162
		var sentResponse sentResponse
163
		testutil.AssertReceive(td.ctx, t, td.sentResponses, &sentResponse, "did not send responses")
Hannah Howard's avatar
Hannah Howard committed
164 165 166 167
		k := sentResponse.link.(cidlink.Link)
		blockIndex := testutil.IndexOf(blks, k.Cid)
		require.NotEqual(t, blockIndex, -1, "sent incorrect link")
		require.Equal(t, blks[blockIndex].RawData(), sentResponse.data, "sent incorrect data")
168
		require.Equal(t, td.requestID, sentResponse.requestID, "has incorrect response id")
169 170 171 172
	}
}

func TestCancellationQueryInProgress(t *testing.T) {
173 174 175 176 177
	td := newTestData(t)
	defer td.cancel()
	blks := td.blockChain.AllBlocks()
	responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
	td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
178
	responseManager.Startup()
179
	responseManager.ProcessRequests(td.ctx, td.p, td.requests)
180 181

	// read one block
Hannah Howard's avatar
Hannah Howard committed
182
	var sentResponse sentResponse
183
	testutil.AssertReceive(td.ctx, t, td.sentResponses, &sentResponse, "did not send response")
Hannah Howard's avatar
Hannah Howard committed
184 185 186 187
	k := sentResponse.link.(cidlink.Link)
	blockIndex := testutil.IndexOf(blks, k.Cid)
	require.NotEqual(t, blockIndex, -1, "sent incorrect link")
	require.Equal(t, blks[blockIndex].RawData(), sentResponse.data, "sent incorrect data")
188
	require.Equal(t, td.requestID, sentResponse.requestID, "has incorrect response id")
189 190

	// send a cancellation
191 192
	cancelRequests := []gsmsg.GraphSyncRequest{
		gsmsg.CancelRequest(td.requestID),
193
	}
194
	responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
195 196 197

	responseManager.synchronize()

198 199
	// at this point we should receive at most one more block, then traversal
	// should complete
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
	additionalBlocks := 0
	for {
		select {
		case <-td.ctx.Done():
			t.Fatal("should complete request before context closes")
		case sentResponse = <-td.sentResponses:
			k = sentResponse.link.(cidlink.Link)
			blockIndex = testutil.IndexOf(blks, k.Cid)
			require.NotEqual(t, blockIndex, -1, "did not send correct link")
			require.Equal(t, blks[blockIndex].RawData(), sentResponse.data, "sent incorrect data")
			require.Equal(t, td.requestID, sentResponse.requestID, "incorrect response id")
			additionalBlocks++
		case <-td.completedRequestChan:
			require.LessOrEqual(t, additionalBlocks, 1, "should send at most 1 additional block")
			return
		}
	}
217 218 219
}

func TestEarlyCancellation(t *testing.T) {
220 221 222 223
	td := newTestData(t)
	defer td.cancel()
	td.queryQueue.popWait.Add(1)
	responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
224
	responseManager.Startup()
225
	responseManager.ProcessRequests(td.ctx, td.p, td.requests)
226 227

	// send a cancellation
228 229
	cancelRequests := []gsmsg.GraphSyncRequest{
		gsmsg.CancelRequest(td.requestID),
230
	}
231
	responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
232 233 234 235

	responseManager.synchronize()

	// unblock popping from queue
236
	td.queryQueue.popWait.Done()
237

238
	timer := time.NewTimer(time.Second)
239
	// verify no responses processed
240
	testutil.AssertDoesReceiveFirst(t, timer.C, "should not process more responses", td.sentResponses, td.completedRequestChan)
241
}
242 243

func TestValidationAndExtensions(t *testing.T) {
244
	t.Run("on its own, should fail validation", func(t *testing.T) {
245 246 247
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
248
		responseManager.Startup()
249
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
250
		var lastRequest completedRequest
251
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
252 253 254 255
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
	})

	t.Run("if non validating hook succeeds, does not pass validation", func(t *testing.T) {
256 257 258
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
259
		responseManager.Startup()
260 261
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			hookActions.SendExtensionData(td.extensionResponse)
262
		})
263
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
264
		var lastRequest completedRequest
265
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
266 267
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
		var receivedExtension sentExtension
268 269
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
270
	})
271

272
	t.Run("if validating hook succeeds, should pass validation", func(t *testing.T) {
273 274 275
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
276
		responseManager.Startup()
277
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
278
			hookActions.ValidateRequest()
279
			hookActions.SendExtensionData(td.extensionResponse)
280
		})
281
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
282
		var lastRequest completedRequest
283
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
284 285
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
286 287
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
288 289
	})

290
	t.Run("if any hook fails, should fail", func(t *testing.T) {
291 292 293
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
294
		responseManager.Startup()
295
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
296
			hookActions.ValidateRequest()
297
		})
298 299
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			hookActions.SendExtensionData(td.extensionResponse)
300 301
			hookActions.TerminateWithError(errors.New("everything went to crap"))
		})
302
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
303
		var lastRequest completedRequest
304
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
305 306
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
		var receivedExtension sentExtension
307 308
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
309
	})
310

311
	t.Run("hooks can be unregistered", func(t *testing.T) {
312 313 314
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
315
		responseManager.Startup()
316
		unregister := td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
317
			hookActions.ValidateRequest()
318
			hookActions.SendExtensionData(td.extensionResponse)
319
		})
320 321

		// hook validates request
322
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
323
		var lastRequest completedRequest
324
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
325 326
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
327 328
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
329 330 331 332 333

		// unregister
		unregister()

		// no same request should fail
334 335
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
336
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
337
	})
338 339

	t.Run("hooks can alter the loader", func(t *testing.T) {
340 341
		td := newTestData(t)
		defer td.cancel()
342 343
		obs := make(map[ipld.Link][]byte)
		oloader, _ := testutil.NewTestStore(obs)
344
		responseManager := New(td.ctx, oloader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
345 346
		responseManager.Startup()
		// add validating hook -- so the request SHOULD succeed
347
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
348 349 350 351 352
			hookActions.ValidateRequest()
		})

		// request fails with base loader reading from block store that's missing data
		var lastRequest completedRequest
353 354
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
355 356
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")

357
		err := td.peristenceOptions.Register("chainstore", td.loader)
358
		require.NoError(t, err)
359
		// register hook to use different loader
360 361
		_ = td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			if _, found := requestData.Extension(td.extensionName); found {
362
				hookActions.UsePersistenceOption("chainstore")
363
				hookActions.SendExtensionData(td.extensionResponse)
364 365 366
			}
		})
		// hook uses different loader that should make request succeed
367 368
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
369 370
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
371 372
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
373 374 375
	})

	t.Run("hooks can alter the node builder chooser", func(t *testing.T) {
376 377 378
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
379 380 381
		responseManager.Startup()

		customChooserCallCount := 0
382
		customChooser := func(ipld.Link, ipld.LinkContext) (ipld.NodeBuilder, error) {
383
			customChooserCallCount++
384
			return ipldfree.NodeBuilder(), nil
385 386 387
		}

		// add validating hook -- so the request SHOULD succeed
388
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
389 390 391 392 393
			hookActions.ValidateRequest()
		})

		// with default chooser, customer chooser not called
		var lastRequest completedRequest
394 395
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
396 397 398 399
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		require.Equal(t, 0, customChooserCallCount)

		// register hook to use custom chooser
400 401
		_ = td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			if _, found := requestData.Extension(td.extensionName); found {
402
				hookActions.UseNodeBuilderChooser(customChooser)
403
				hookActions.SendExtensionData(td.extensionResponse)
404 405 406 407
			}
		})

		// verify now that request succeeds and uses custom chooser
408 409
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
410 411
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
412 413
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
414 415
		require.Equal(t, 5, customChooserCallCount)
	})
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562

	t.Run("test block hook processing", func(t *testing.T) {
		t.Run("can send extension data", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
				hookActions.SendExtensionData(td.extensionResponse)
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
			require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
			for i := 0; i < td.blockChainLength; i++ {
				var receivedExtension sentExtension
				testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
				require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
			}
		})

		t.Run("can send errors", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
				hookActions.TerminateWithError(errors.New("failed"))
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
			require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "request should succeed")
		})

		t.Run("can pause/unpause", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
			blkIndex := 1
			blockCount := 3
			var hasPaused bool
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
				if blkIndex >= blockCount && !hasPaused {
					hookActions.PauseResponse()
					hasPaused = true
				}
				blkIndex++
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			timer := time.NewTimer(500 * time.Millisecond)
			testutil.AssertDoesReceiveFirst(t, timer.C, "should not complete request while paused", td.completedRequestChan)
			var sentResponses []sentResponse
		nomoreresponses:
			for {
				select {
				case sentResponse := <-td.sentResponses:
					sentResponses = append(sentResponses, sentResponse)
				default:
					break nomoreresponses
				}
			}
			require.LessOrEqual(t, len(sentResponses), blockCount)
			err := responseManager.UnpauseResponse(td.p, td.requestID)
			require.NoError(t, err)
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
			require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		})

	})
}

type testData struct {
	ctx                   context.Context
	cancel                func()
	blockStore            map[ipld.Link][]byte
	loader                ipld.Loader
	storer                ipld.Storer
	blockChainLength      int
	blockChain            *testutil.TestBlockChain
	completedRequestChan  chan completedRequest
	sentResponses         chan sentResponse
	sentExtensions        chan sentExtension
	peerManager           *fakePeerManager
	queryQueue            *fakeQueryQueue
	extensionData         []byte
	extensionName         graphsync.ExtensionName
	extension             graphsync.ExtensionData
	extensionResponseData []byte
	extensionResponse     graphsync.ExtensionData
	requestID             graphsync.RequestID
	requests              []gsmsg.GraphSyncRequest
	p                     peer.ID
	peristenceOptions     *persistenceoptions.PersistenceOptions
	requestHooks          *requesthooks.IncomingRequestHooks
	blockHooks            *blockhooks.OutgoingBlockHooks
}

func newTestData(t *testing.T) testData {
	ctx := context.Background()
	td := testData{}
	td.ctx, td.cancel = context.WithTimeout(ctx, 10*time.Second)

	td.blockStore = make(map[ipld.Link][]byte)
	td.loader, td.storer = testutil.NewTestStore(td.blockStore)
	td.blockChainLength = 5
	td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, td.blockChainLength)

	td.completedRequestChan = make(chan completedRequest, 1)
	td.sentResponses = make(chan sentResponse, td.blockChainLength*2)
	td.sentExtensions = make(chan sentExtension, td.blockChainLength*2)
	fprs := &fakePeerResponseSender{lastCompletedRequest: td.completedRequestChan, sentResponses: td.sentResponses, sentExtensions: td.sentExtensions}
	td.peerManager = &fakePeerManager{peerResponseSender: fprs}
	td.queryQueue = &fakeQueryQueue{}

	td.extensionData = testutil.RandomBytes(100)
	td.extensionName = graphsync.ExtensionName("AppleSauce/McGee")
	td.extension = graphsync.ExtensionData{
		Name: td.extensionName,
		Data: td.extensionData,
	}
	td.extensionResponseData = testutil.RandomBytes(100)
	td.extensionResponse = graphsync.ExtensionData{
		Name: td.extensionName,
		Data: td.extensionResponseData,
	}

	td.requestID = graphsync.RequestID(rand.Int31())
	td.requests = []gsmsg.GraphSyncRequest{
		gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0), td.extension),
	}
	td.p = testutil.GeneratePeers(1)[0]
	td.peristenceOptions = persistenceoptions.New()
	td.requestHooks = requesthooks.New(td.peristenceOptions)
	td.blockHooks = blockhooks.New()
	return td
563
}