responsemanager_test.go 22.6 KB
Newer Older
1 2 3 4
package responsemanager

import (
	"context"
5
	"errors"
6 7 8 9 10
	"math/rand"
	"sync"
	"testing"
	"time"

11
	"github.com/ipfs/go-graphsync"
12
	gsmsg "github.com/ipfs/go-graphsync/message"
13
	"github.com/ipfs/go-graphsync/responsemanager/blockhooks"
14
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
15 16
	"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
	"github.com/ipfs/go-graphsync/responsemanager/requesthooks"
17
	"github.com/ipfs/go-graphsync/selectorvalidator"
18
	"github.com/ipfs/go-graphsync/testutil"
19
	"github.com/ipfs/go-peertaskqueue/peertask"
20
	ipld "github.com/ipld/go-ipld-prime"
21
	ipldfree "github.com/ipld/go-ipld-prime/impl/free"
22
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
23
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
24
	"github.com/stretchr/testify/require"
25 26 27 28 29
)

type fakeQueryQueue struct {
	popWait   sync.WaitGroup
	queriesLk sync.RWMutex
30
	queries   []*peertask.QueueTask
31 32
}

33
func (fqq *fakeQueryQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
34
	fqq.queriesLk.Lock()
35 36 37 38 39 40

	// This isn't quite right as the queue should deduplicate requests, but
	// it's good enough.
	for _, task := range tasks {
		fqq.queries = append(fqq.queries, peertask.NewQueueTask(task, to, time.Now()))
	}
41 42 43
	fqq.queriesLk.Unlock()
}

44
func (fqq *fakeQueryQueue) PopTasks(targetWork int) (peer.ID, []*peertask.Task, int) {
45 46 47 48
	fqq.popWait.Wait()
	fqq.queriesLk.Lock()
	defer fqq.queriesLk.Unlock()
	if len(fqq.queries) == 0 {
49
		return "", nil, -1
50
	}
51 52
	// We're not bothering to implement "work"
	task := fqq.queries[0]
53
	fqq.queries = fqq.queries[1:]
54
	return task.Target, []*peertask.Task{&task.Task}, 0
55 56
}

57
func (fqq *fakeQueryQueue) Remove(topic peertask.Topic, p peer.ID) {
58 59 60
	fqq.queriesLk.Lock()
	defer fqq.queriesLk.Unlock()
	for i, query := range fqq.queries {
61 62
		if query.Target == p && query.Topic == topic {
			fqq.queries = append(fqq.queries[:i], fqq.queries[i+1:]...)
63 64 65 66
		}
	}
}

67 68 69 70
func (fqq *fakeQueryQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {
	// We don't track active tasks so this is a no-op
}

71 72 73
func (fqq *fakeQueryQueue) ThawRound() {

}
74 75 76 77 78 79 80 81 82 83 84 85

type fakePeerManager struct {
	lastPeer           peer.ID
	peerResponseSender peerresponsemanager.PeerResponseSender
}

func (fpm *fakePeerManager) SenderForPeer(p peer.ID) peerresponsemanager.PeerResponseSender {
	fpm.lastPeer = p
	return fpm.peerResponseSender
}

type sentResponse struct {
86
	requestID graphsync.RequestID
87 88 89 90
	link      ipld.Link
	data      []byte
}

91 92 93 94 95 96 97 98 99
type sentExtension struct {
	requestID graphsync.RequestID
	extension graphsync.ExtensionData
}

type completedRequest struct {
	requestID graphsync.RequestID
	result    graphsync.ResponseStatusCode
}
100 101
type fakePeerResponseSender struct {
	sentResponses        chan sentResponse
102 103
	sentExtensions       chan sentExtension
	lastCompletedRequest chan completedRequest
104 105 106 107 108
}

func (fprs *fakePeerResponseSender) Startup()  {}
func (fprs *fakePeerResponseSender) Shutdown() {}

109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
type fakeBlkData struct {
	link ipld.Link
	size uint64
}

func (fbd fakeBlkData) Link() ipld.Link {
	return fbd.link
}

func (fbd fakeBlkData) BlockSize() uint64 {
	return fbd.size
}

func (fbd fakeBlkData) BlockSizeOnWire() uint64 {
	return fbd.size
}

126
func (fprs *fakePeerResponseSender) SendResponse(
127
	requestID graphsync.RequestID,
128 129
	link ipld.Link,
	data []byte,
130
) graphsync.BlockData {
131
	fprs.sentResponses <- sentResponse{requestID, link, data}
132
	return fakeBlkData{link, uint64(len(data))}
133 134
}

135 136 137 138 139 140 141
func (fprs *fakePeerResponseSender) SendExtensionData(
	requestID graphsync.RequestID,
	extension graphsync.ExtensionData,
) {
	fprs.sentExtensions <- sentExtension{requestID, extension}
}

142
func (fprs *fakePeerResponseSender) FinishRequest(requestID graphsync.RequestID) {
143
	fprs.lastCompletedRequest <- completedRequest{requestID, graphsync.RequestCompletedFull}
144 145
}

146
func (fprs *fakePeerResponseSender) FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
147
	fprs.lastCompletedRequest <- completedRequest{requestID, status}
148 149 150
}

func TestIncomingQuery(t *testing.T) {
151 152 153 154 155 156
	td := newTestData(t)
	defer td.cancel()
	blks := td.blockChain.AllBlocks()

	responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
	td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
157 158
	responseManager.Startup()

159 160
	responseManager.ProcessRequests(td.ctx, td.p, td.requests)
	testutil.AssertDoesReceive(td.ctx, t, td.completedRequestChan, "Should have completed request but didn't")
161
	for i := 0; i < len(blks); i++ {
Hannah Howard's avatar
Hannah Howard committed
162
		var sentResponse sentResponse
163
		testutil.AssertReceive(td.ctx, t, td.sentResponses, &sentResponse, "did not send responses")
Hannah Howard's avatar
Hannah Howard committed
164 165 166 167
		k := sentResponse.link.(cidlink.Link)
		blockIndex := testutil.IndexOf(blks, k.Cid)
		require.NotEqual(t, blockIndex, -1, "sent incorrect link")
		require.Equal(t, blks[blockIndex].RawData(), sentResponse.data, "sent incorrect data")
168
		require.Equal(t, td.requestID, sentResponse.requestID, "has incorrect response id")
169 170 171 172
	}
}

func TestCancellationQueryInProgress(t *testing.T) {
173 174 175 176 177
	td := newTestData(t)
	defer td.cancel()
	blks := td.blockChain.AllBlocks()
	responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
	td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
178
	responseManager.Startup()
179
	responseManager.ProcessRequests(td.ctx, td.p, td.requests)
180 181

	// read one block
Hannah Howard's avatar
Hannah Howard committed
182
	var sentResponse sentResponse
183
	testutil.AssertReceive(td.ctx, t, td.sentResponses, &sentResponse, "did not send response")
Hannah Howard's avatar
Hannah Howard committed
184 185 186 187
	k := sentResponse.link.(cidlink.Link)
	blockIndex := testutil.IndexOf(blks, k.Cid)
	require.NotEqual(t, blockIndex, -1, "sent incorrect link")
	require.Equal(t, blks[blockIndex].RawData(), sentResponse.data, "sent incorrect data")
188
	require.Equal(t, td.requestID, sentResponse.requestID, "has incorrect response id")
189 190

	// send a cancellation
191 192
	cancelRequests := []gsmsg.GraphSyncRequest{
		gsmsg.CancelRequest(td.requestID),
193
	}
194
	responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
195 196 197

	responseManager.synchronize()

198 199
	// at this point we should receive at most one more block, then traversal
	// should complete
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
	additionalBlocks := 0
	for {
		select {
		case <-td.ctx.Done():
			t.Fatal("should complete request before context closes")
		case sentResponse = <-td.sentResponses:
			k = sentResponse.link.(cidlink.Link)
			blockIndex = testutil.IndexOf(blks, k.Cid)
			require.NotEqual(t, blockIndex, -1, "did not send correct link")
			require.Equal(t, blks[blockIndex].RawData(), sentResponse.data, "sent incorrect data")
			require.Equal(t, td.requestID, sentResponse.requestID, "incorrect response id")
			additionalBlocks++
		case <-td.completedRequestChan:
			require.LessOrEqual(t, additionalBlocks, 1, "should send at most 1 additional block")
			return
		}
	}
217 218 219
}

func TestEarlyCancellation(t *testing.T) {
220 221 222 223
	td := newTestData(t)
	defer td.cancel()
	td.queryQueue.popWait.Add(1)
	responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
224
	responseManager.Startup()
225
	responseManager.ProcessRequests(td.ctx, td.p, td.requests)
226 227

	// send a cancellation
228 229
	cancelRequests := []gsmsg.GraphSyncRequest{
		gsmsg.CancelRequest(td.requestID),
230
	}
231
	responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
232 233 234 235

	responseManager.synchronize()

	// unblock popping from queue
236
	td.queryQueue.popWait.Done()
237

238
	timer := time.NewTimer(time.Second)
239
	// verify no responses processed
240
	testutil.AssertDoesReceiveFirst(t, timer.C, "should not process more responses", td.sentResponses, td.completedRequestChan)
241
}
242 243

func TestValidationAndExtensions(t *testing.T) {
244
	t.Run("on its own, should fail validation", func(t *testing.T) {
245 246 247
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
248
		responseManager.Startup()
249
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
250
		var lastRequest completedRequest
251
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
252 253 254 255
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
	})

	t.Run("if non validating hook succeeds, does not pass validation", func(t *testing.T) {
256 257 258
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
259
		responseManager.Startup()
260 261
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			hookActions.SendExtensionData(td.extensionResponse)
262
		})
263
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
264
		var lastRequest completedRequest
265
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
266 267
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
		var receivedExtension sentExtension
268 269
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
270
	})
271

272
	t.Run("if validating hook succeeds, should pass validation", func(t *testing.T) {
273 274 275
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
276
		responseManager.Startup()
277
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
278
			hookActions.ValidateRequest()
279
			hookActions.SendExtensionData(td.extensionResponse)
280
		})
281
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
282
		var lastRequest completedRequest
283
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
284 285
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
286 287
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
288 289
	})

290
	t.Run("if any hook fails, should fail", func(t *testing.T) {
291 292 293
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
294
		responseManager.Startup()
295
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
296
			hookActions.ValidateRequest()
297
		})
298 299
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			hookActions.SendExtensionData(td.extensionResponse)
300 301
			hookActions.TerminateWithError(errors.New("everything went to crap"))
		})
302
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
303
		var lastRequest completedRequest
304
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
305 306
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
		var receivedExtension sentExtension
307 308
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
309
	})
310

311
	t.Run("hooks can be unregistered", func(t *testing.T) {
312 313 314
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
315
		responseManager.Startup()
316
		unregister := td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
317
			hookActions.ValidateRequest()
318
			hookActions.SendExtensionData(td.extensionResponse)
319
		})
320 321

		// hook validates request
322
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
323
		var lastRequest completedRequest
324
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
325 326
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
327 328
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
329 330 331 332 333

		// unregister
		unregister()

		// no same request should fail
334 335
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
336
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
337
	})
338 339

	t.Run("hooks can alter the loader", func(t *testing.T) {
340 341
		td := newTestData(t)
		defer td.cancel()
342 343
		obs := make(map[ipld.Link][]byte)
		oloader, _ := testutil.NewTestStore(obs)
344
		responseManager := New(td.ctx, oloader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
345 346
		responseManager.Startup()
		// add validating hook -- so the request SHOULD succeed
347
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
348 349 350 351 352
			hookActions.ValidateRequest()
		})

		// request fails with base loader reading from block store that's missing data
		var lastRequest completedRequest
353 354
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
355 356
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")

357
		err := td.peristenceOptions.Register("chainstore", td.loader)
358
		require.NoError(t, err)
359
		// register hook to use different loader
360 361
		_ = td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			if _, found := requestData.Extension(td.extensionName); found {
362
				hookActions.UsePersistenceOption("chainstore")
363
				hookActions.SendExtensionData(td.extensionResponse)
364 365 366
			}
		})
		// hook uses different loader that should make request succeed
367 368
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
369 370
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
371 372
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
373 374 375
	})

	t.Run("hooks can alter the node builder chooser", func(t *testing.T) {
376 377 378
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
379 380 381
		responseManager.Startup()

		customChooserCallCount := 0
382
		customChooser := func(ipld.Link, ipld.LinkContext) (ipld.NodeBuilder, error) {
383
			customChooserCallCount++
384
			return ipldfree.NodeBuilder(), nil
385 386 387
		}

		// add validating hook -- so the request SHOULD succeed
388
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
389 390 391 392 393
			hookActions.ValidateRequest()
		})

		// with default chooser, customer chooser not called
		var lastRequest completedRequest
394 395
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
396 397 398 399
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		require.Equal(t, 0, customChooserCallCount)

		// register hook to use custom chooser
400 401
		_ = td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			if _, found := requestData.Extension(td.extensionName); found {
402
				hookActions.UseNodeBuilderChooser(customChooser)
403
				hookActions.SendExtensionData(td.extensionResponse)
404 405 406 407
			}
		})

		// verify now that request succeeds and uses custom chooser
408 409
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
410 411
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
412 413
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
414 415
		require.Equal(t, 5, customChooserCallCount)
	})


	t.Run("test block hook processing", func(t *testing.T) {
		t.Run("can send extension data", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
				hookActions.SendExtensionData(td.extensionResponse)
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
			require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
			for i := 0; i < td.blockChainLength; i++ {
				var receivedExtension sentExtension
				testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
				require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
			}
		})

		t.Run("can send errors", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
				hookActions.TerminateWithError(errors.New("failed"))
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
			require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "request should succeed")
		})

		t.Run("can pause/unpause", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks)
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
			blkIndex := 1
			blockCount := 3
			var hasPaused bool
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
				if blkIndex >= blockCount && !hasPaused {
					hookActions.PauseResponse()
					hasPaused = true
				}
				blkIndex++
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			timer := time.NewTimer(500 * time.Millisecond)
			testutil.AssertDoesReceiveFirst(t, timer.C, "should not complete request while paused", td.completedRequestChan)
			var sentResponses []sentResponse
		nomoreresponses:
			for {
				select {
				case sentResponse := <-td.sentResponses:
					sentResponses = append(sentResponses, sentResponse)
				default:
					break nomoreresponses
				}
			}
			require.LessOrEqual(t, len(sentResponses), blockCount)
			err := responseManager.UnpauseResponse(td.p, td.requestID)
			require.NoError(t, err)
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
			require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		})

	})
}

type testData struct {
	ctx                   context.Context
	cancel                func()
	blockStore            map[ipld.Link][]byte
	loader                ipld.Loader
	storer                ipld.Storer
	blockChainLength      int
	blockChain            *testutil.TestBlockChain
	completedRequestChan  chan completedRequest
	sentResponses         chan sentResponse
	sentExtensions        chan sentExtension
	peerManager           *fakePeerManager
	queryQueue            *fakeQueryQueue
	extensionData         []byte
	extensionName         graphsync.ExtensionName
	extension             graphsync.ExtensionData
	extensionResponseData []byte
	extensionResponse     graphsync.ExtensionData
	requestID             graphsync.RequestID
	requests              []gsmsg.GraphSyncRequest
	p                     peer.ID
	peristenceOptions     *persistenceoptions.PersistenceOptions
	requestHooks          *requesthooks.IncomingRequestHooks
	blockHooks            *blockhooks.OutgoingBlockHooks
}

func newTestData(t *testing.T) testData {
	ctx := context.Background()
	td := testData{}
	td.ctx, td.cancel = context.WithTimeout(ctx, 10*time.Second)

	td.blockStore = make(map[ipld.Link][]byte)
	td.loader, td.storer = testutil.NewTestStore(td.blockStore)
	td.blockChainLength = 5
	td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, td.blockChainLength)

	td.completedRequestChan = make(chan completedRequest, 1)
	td.sentResponses = make(chan sentResponse, td.blockChainLength*2)
	td.sentExtensions = make(chan sentExtension, td.blockChainLength*2)
	fprs := &fakePeerResponseSender{lastCompletedRequest: td.completedRequestChan, sentResponses: td.sentResponses, sentExtensions: td.sentExtensions}
	td.peerManager = &fakePeerManager{peerResponseSender: fprs}
	td.queryQueue = &fakeQueryQueue{}

	td.extensionData = testutil.RandomBytes(100)
	td.extensionName = graphsync.ExtensionName("AppleSauce/McGee")
	td.extension = graphsync.ExtensionData{
		Name: td.extensionName,
		Data: td.extensionData,
	}
	td.extensionResponseData = testutil.RandomBytes(100)
	td.extensionResponse = graphsync.ExtensionData{
		Name: td.extensionName,
		Data: td.extensionResponseData,
	}

	td.requestID = graphsync.RequestID(rand.Int31())
	td.requests = []gsmsg.GraphSyncRequest{
		gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0), td.extension),
	}
	td.p = testutil.GeneratePeers(1)[0]
	td.peristenceOptions = persistenceoptions.New()
	td.requestHooks = requesthooks.New(td.peristenceOptions)
	td.blockHooks = blockhooks.New()
	return td
563
}