responsemanager_test.go 37.5 KB
Newer Older
1 2 3 4
package responsemanager

import (
	"context"
5
	"errors"
6 7 8 9 10
	"math/rand"
	"sync"
	"testing"
	"time"

11
	"github.com/ipfs/go-cid"
12
	"github.com/ipfs/go-graphsync"
13
	"github.com/ipfs/go-graphsync/cidset"
14
	gsmsg "github.com/ipfs/go-graphsync/message"
Hannah Howard's avatar
Hannah Howard committed
15
	"github.com/ipfs/go-graphsync/responsemanager/hooks"
16
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
17
	"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
18
	"github.com/ipfs/go-graphsync/selectorvalidator"
19
	"github.com/ipfs/go-graphsync/testutil"
20
	"github.com/ipfs/go-peertaskqueue/peertask"
21 22
	ipld "github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Hannah Howard's avatar
Hannah Howard committed
23
	basicnode "github.com/ipld/go-ipld-prime/node/basic"
24
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
25
	"github.com/stretchr/testify/require"
26 27 28 29 30
)

type fakeQueryQueue struct {
	popWait   sync.WaitGroup
	queriesLk sync.RWMutex
31
	queries   []*peertask.QueueTask
32 33
}

34
func (fqq *fakeQueryQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
35
	fqq.queriesLk.Lock()
36 37 38 39 40 41

	// This isn't quite right as the queue should deduplicate requests, but
	// it's good enough.
	for _, task := range tasks {
		fqq.queries = append(fqq.queries, peertask.NewQueueTask(task, to, time.Now()))
	}
42 43 44
	fqq.queriesLk.Unlock()
}

45
func (fqq *fakeQueryQueue) PopTasks(targetWork int) (peer.ID, []*peertask.Task, int) {
46 47 48 49
	fqq.popWait.Wait()
	fqq.queriesLk.Lock()
	defer fqq.queriesLk.Unlock()
	if len(fqq.queries) == 0 {
50
		return "", nil, -1
51
	}
52 53
	// We're not bothering to implement "work"
	task := fqq.queries[0]
54
	fqq.queries = fqq.queries[1:]
55
	return task.Target, []*peertask.Task{&task.Task}, 0
56 57
}

58
func (fqq *fakeQueryQueue) Remove(topic peertask.Topic, p peer.ID) {
59 60 61
	fqq.queriesLk.Lock()
	defer fqq.queriesLk.Unlock()
	for i, query := range fqq.queries {
62 63
		if query.Target == p && query.Topic == topic {
			fqq.queries = append(fqq.queries[:i], fqq.queries[i+1:]...)
64 65 66 67
		}
	}
}

68 69 70 71
func (fqq *fakeQueryQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {
	// We don't track active tasks so this is a no-op
}

72 73 74
func (fqq *fakeQueryQueue) ThawRound() {

}
75 76 77 78 79 80 81 82 83 84 85 86

type fakePeerManager struct {
	lastPeer           peer.ID
	peerResponseSender peerresponsemanager.PeerResponseSender
}

func (fpm *fakePeerManager) SenderForPeer(p peer.ID) peerresponsemanager.PeerResponseSender {
	fpm.lastPeer = p
	return fpm.peerResponseSender
}

type sentResponse struct {
87
	requestID graphsync.RequestID
88 89 90 91
	link      ipld.Link
	data      []byte
}

92 93 94 95 96 97 98 99 100
type sentExtension struct {
	requestID graphsync.RequestID
	extension graphsync.ExtensionData
}

type completedRequest struct {
	requestID graphsync.RequestID
	result    graphsync.ResponseStatusCode
}
Hannah Howard's avatar
Hannah Howard committed
101 102 103 104
type pausedRequest struct {
	requestID graphsync.RequestID
}

105 106
type fakePeerResponseSender struct {
	sentResponses        chan sentResponse
107 108
	sentExtensions       chan sentExtension
	lastCompletedRequest chan completedRequest
Hannah Howard's avatar
Hannah Howard committed
109
	pausedRequests       chan pausedRequest
110
	ignoredLinks         chan []ipld.Link
111 112 113 114 115
}

func (fprs *fakePeerResponseSender) Startup()  {}
func (fprs *fakePeerResponseSender) Shutdown() {}

116 117 118 119 120
type fakeBlkData struct {
	link ipld.Link
	size uint64
}

121 122 123 124
func (fprs *fakePeerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) {
	fprs.ignoredLinks <- links
}

125 126 127 128 129 130 131 132 133 134 135 136
func (fbd fakeBlkData) Link() ipld.Link {
	return fbd.link
}

func (fbd fakeBlkData) BlockSize() uint64 {
	return fbd.size
}

func (fbd fakeBlkData) BlockSizeOnWire() uint64 {
	return fbd.size
}

137
func (fprs *fakePeerResponseSender) SendResponse(
138
	requestID graphsync.RequestID,
139 140
	link ipld.Link,
	data []byte,
141
) graphsync.BlockData {
142
	fprs.sentResponses <- sentResponse{requestID, link, data}
143
	return fakeBlkData{link, uint64(len(data))}
144 145
}

146 147 148 149 150 151 152
func (fprs *fakePeerResponseSender) SendExtensionData(
	requestID graphsync.RequestID,
	extension graphsync.ExtensionData,
) {
	fprs.sentExtensions <- sentExtension{requestID, extension}
}

153
func (fprs *fakePeerResponseSender) FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode {
154
	fprs.lastCompletedRequest <- completedRequest{requestID, graphsync.RequestCompletedFull}
155
	return graphsync.RequestCompletedFull
156 157
}

158
func (fprs *fakePeerResponseSender) FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
159
	fprs.lastCompletedRequest <- completedRequest{requestID, status}
160 161
}

Hannah Howard's avatar
Hannah Howard committed
162 163 164 165
func (fprs *fakePeerResponseSender) PauseRequest(requestID graphsync.RequestID) {
	fprs.pausedRequests <- pausedRequest{requestID}
}

Hannah Howard's avatar
Hannah Howard committed
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
func (fprs *fakePeerResponseSender) Transaction(requestID graphsync.RequestID, transaction peerresponsemanager.Transaction) error {
	fprts := &fakePeerResponseTransactionSender{requestID, fprs}
	return transaction(fprts)
}

type fakePeerResponseTransactionSender struct {
	requestID graphsync.RequestID
	prs       peerresponsemanager.PeerResponseSender
}

func (fprts *fakePeerResponseTransactionSender) SendResponse(link ipld.Link, data []byte) graphsync.BlockData {
	return fprts.prs.SendResponse(fprts.requestID, link, data)
}

func (fprts *fakePeerResponseTransactionSender) SendExtensionData(extension graphsync.ExtensionData) {
	fprts.prs.SendExtensionData(fprts.requestID, extension)
}

func (fprts *fakePeerResponseTransactionSender) FinishRequest() graphsync.ResponseStatusCode {
	return fprts.prs.FinishRequest(fprts.requestID)
}

func (fprts *fakePeerResponseTransactionSender) FinishWithError(status graphsync.ResponseStatusCode) {
	fprts.prs.FinishWithError(fprts.requestID, status)
}

func (fprts *fakePeerResponseTransactionSender) PauseRequest() {
	fprts.prs.PauseRequest(fprts.requestID)
}

196
func TestIncomingQuery(t *testing.T) {
197 198 199 200
	td := newTestData(t)
	defer td.cancel()
	blks := td.blockChain.AllBlocks()

201
	responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
202
	td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
203 204
	responseManager.Startup()

205 206
	responseManager.ProcessRequests(td.ctx, td.p, td.requests)
	testutil.AssertDoesReceive(td.ctx, t, td.completedRequestChan, "Should have completed request but didn't")
207
	for i := 0; i < len(blks); i++ {
Hannah Howard's avatar
Hannah Howard committed
208
		var sentResponse sentResponse
209
		testutil.AssertReceive(td.ctx, t, td.sentResponses, &sentResponse, "did not send responses")
Hannah Howard's avatar
Hannah Howard committed
210 211 212 213
		k := sentResponse.link.(cidlink.Link)
		blockIndex := testutil.IndexOf(blks, k.Cid)
		require.NotEqual(t, blockIndex, -1, "sent incorrect link")
		require.Equal(t, blks[blockIndex].RawData(), sentResponse.data, "sent incorrect data")
214
		require.Equal(t, td.requestID, sentResponse.requestID, "has incorrect response id")
215 216 217 218
	}
}

func TestCancellationQueryInProgress(t *testing.T) {
219 220 221
	td := newTestData(t)
	defer td.cancel()
	blks := td.blockChain.AllBlocks()
222
	responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
223
	td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
224
	responseManager.Startup()
225
	responseManager.ProcessRequests(td.ctx, td.p, td.requests)
226 227

	// read one block
Hannah Howard's avatar
Hannah Howard committed
228
	var sentResponse sentResponse
229
	testutil.AssertReceive(td.ctx, t, td.sentResponses, &sentResponse, "did not send response")
Hannah Howard's avatar
Hannah Howard committed
230 231 232 233
	k := sentResponse.link.(cidlink.Link)
	blockIndex := testutil.IndexOf(blks, k.Cid)
	require.NotEqual(t, blockIndex, -1, "sent incorrect link")
	require.Equal(t, blks[blockIndex].RawData(), sentResponse.data, "sent incorrect data")
234
	require.Equal(t, td.requestID, sentResponse.requestID, "has incorrect response id")
235 236

	// send a cancellation
237 238
	cancelRequests := []gsmsg.GraphSyncRequest{
		gsmsg.CancelRequest(td.requestID),
239
	}
240
	responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
241 242 243

	responseManager.synchronize()

244 245
	// at this point we should receive at most one more block, then traversal
	// should complete
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
	additionalBlocks := 0
	for {
		select {
		case <-td.ctx.Done():
			t.Fatal("should complete request before context closes")
		case sentResponse = <-td.sentResponses:
			k = sentResponse.link.(cidlink.Link)
			blockIndex = testutil.IndexOf(blks, k.Cid)
			require.NotEqual(t, blockIndex, -1, "did not send correct link")
			require.Equal(t, blks[blockIndex].RawData(), sentResponse.data, "sent incorrect data")
			require.Equal(t, td.requestID, sentResponse.requestID, "incorrect response id")
			additionalBlocks++
		case <-td.completedRequestChan:
			require.LessOrEqual(t, additionalBlocks, 1, "should send at most 1 additional block")
			return
		}
	}
263 264 265
}

func TestEarlyCancellation(t *testing.T) {
266 267 268
	td := newTestData(t)
	defer td.cancel()
	td.queryQueue.popWait.Add(1)
269
	responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
270
	responseManager.Startup()
271
	responseManager.ProcessRequests(td.ctx, td.p, td.requests)
272 273

	// send a cancellation
274 275
	cancelRequests := []gsmsg.GraphSyncRequest{
		gsmsg.CancelRequest(td.requestID),
276
	}
277
	responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
278 279 280 281

	responseManager.synchronize()

	// unblock popping from queue
282
	td.queryQueue.popWait.Done()
283

284
	timer := time.NewTimer(time.Second)
285
	// verify no responses processed
286
	testutil.AssertDoesReceiveFirst(t, timer.C, "should not process more responses", td.sentResponses, td.completedRequestChan)
287
}
288 289

func TestValidationAndExtensions(t *testing.T) {
290
	t.Run("on its own, should fail validation", func(t *testing.T) {
291 292
		td := newTestData(t)
		defer td.cancel()
293
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
294
		responseManager.Startup()
295
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
296
		var lastRequest completedRequest
297
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
298 299 300 301
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
	})

	t.Run("if non validating hook succeeds, does not pass validation", func(t *testing.T) {
302 303
		td := newTestData(t)
		defer td.cancel()
304
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
305
		responseManager.Startup()
306 307
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			hookActions.SendExtensionData(td.extensionResponse)
308
		})
309
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
310
		var lastRequest completedRequest
311
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
312 313
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
		var receivedExtension sentExtension
314 315
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
316
	})
317

318
	t.Run("if validating hook succeeds, should pass validation", func(t *testing.T) {
319 320
		td := newTestData(t)
		defer td.cancel()
321
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
322
		responseManager.Startup()
323
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
324
			hookActions.ValidateRequest()
325
			hookActions.SendExtensionData(td.extensionResponse)
326
		})
327
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
328
		var lastRequest completedRequest
329
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
330 331
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
332 333
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
334 335
	})

336
	t.Run("if any hook fails, should fail", func(t *testing.T) {
337 338
		td := newTestData(t)
		defer td.cancel()
339
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
340
		responseManager.Startup()
341
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
342
			hookActions.ValidateRequest()
343
		})
344 345
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			hookActions.SendExtensionData(td.extensionResponse)
346 347
			hookActions.TerminateWithError(errors.New("everything went to crap"))
		})
348
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
349
		var lastRequest completedRequest
350
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
351 352
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
		var receivedExtension sentExtension
353 354
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
355
	})
356

357
	t.Run("hooks can be unregistered", func(t *testing.T) {
358 359
		td := newTestData(t)
		defer td.cancel()
360
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
361
		responseManager.Startup()
362
		unregister := td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
363
			hookActions.ValidateRequest()
364
			hookActions.SendExtensionData(td.extensionResponse)
365
		})
366 367

		// hook validates request
368
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
369
		var lastRequest completedRequest
370
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
371 372
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
373 374
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
375 376 377 378 379

		// unregister
		unregister()

		// no same request should fail
380 381
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
382
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
383
	})
384 385

	t.Run("hooks can alter the loader", func(t *testing.T) {
386 387
		td := newTestData(t)
		defer td.cancel()
388 389
		obs := make(map[ipld.Link][]byte)
		oloader, _ := testutil.NewTestStore(obs)
390
		responseManager := New(td.ctx, oloader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
391 392
		responseManager.Startup()
		// add validating hook -- so the request SHOULD succeed
393
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
394 395 396 397 398
			hookActions.ValidateRequest()
		})

		// request fails with base loader reading from block store that's missing data
		var lastRequest completedRequest
399 400
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
401 402
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")

403
		err := td.peristenceOptions.Register("chainstore", td.loader)
404
		require.NoError(t, err)
405
		// register hook to use different loader
406 407
		_ = td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			if _, found := requestData.Extension(td.extensionName); found {
408
				hookActions.UsePersistenceOption("chainstore")
409
				hookActions.SendExtensionData(td.extensionResponse)
410 411 412
			}
		})
		// hook uses different loader that should make request succeed
413 414
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
415 416
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
417 418
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
419 420 421
	})

	t.Run("hooks can alter the node builder chooser", func(t *testing.T) {
422 423
		td := newTestData(t)
		defer td.cancel()
424
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
425 426 427
		responseManager.Startup()

		customChooserCallCount := 0
Hannah Howard's avatar
Hannah Howard committed
428
		customChooser := func(ipld.Link, ipld.LinkContext) (ipld.NodeStyle, error) {
429
			customChooserCallCount++
Hannah Howard's avatar
Hannah Howard committed
430
			return basicnode.Style.Any, nil
431 432 433
		}

		// add validating hook -- so the request SHOULD succeed
434
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
435 436 437 438 439
			hookActions.ValidateRequest()
		})

		// with default chooser, customer chooser not called
		var lastRequest completedRequest
440 441
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
442 443 444 445
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		require.Equal(t, 0, customChooserCallCount)

		// register hook to use custom chooser
446 447
		_ = td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			if _, found := requestData.Extension(td.extensionName); found {
Hannah Howard's avatar
Hannah Howard committed
448
				hookActions.UseLinkTargetNodeStyleChooser(customChooser)
449
				hookActions.SendExtensionData(td.extensionResponse)
450 451 452 453
			}
		})

		// verify now that request succeeds and uses custom chooser
454 455
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
456 457
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
458 459
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
460 461
		require.Equal(t, 5, customChooserCallCount)
	})
462

463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495
	t.Run("do-not-send-cids extension", func(t *testing.T) {
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
		responseManager.Startup()
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			hookActions.ValidateRequest()
		})
		set := cid.NewSet()
		blks := td.blockChain.Blocks(0, 5)
		for _, blk := range blks {
			set.Add(blk.Cid())
		}
		data, err := cidset.EncodeCidSet(set)
		require.NoError(t, err)
		requests := []gsmsg.GraphSyncRequest{
			gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0),
				graphsync.ExtensionData{
					Name: graphsync.ExtensionDoNotSendCIDs,
					Data: data,
				}),
		}
		responseManager.ProcessRequests(td.ctx, td.p, requests)
		var lastRequest completedRequest
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var lastLinks []ipld.Link
		testutil.AssertReceive(td.ctx, t, td.ignoredLinks, &lastLinks, "should send ignored links")
		require.Len(t, lastLinks, set.Len())
		for _, link := range lastLinks {
			require.True(t, set.Has(link.(cidlink.Link).Cid))
		}
	})
496 497 498 499
	t.Run("test block hook processing", func(t *testing.T) {
		t.Run("can send extension data", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
500
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
				hookActions.SendExtensionData(td.extensionResponse)
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
			require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
			for i := 0; i < td.blockChainLength; i++ {
				var receivedExtension sentExtension
				testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
				require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
			}
		})

		t.Run("can send errors", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
522
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
523 524 525 526 527 528 529 530 531 532
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
				hookActions.TerminateWithError(errors.New("failed"))
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
Hannah Howard's avatar
Hannah Howard committed
533
			require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "request should fail")
534 535 536 537 538
		})

		t.Run("can pause/unpause", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
539
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
540 541 542 543
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
Hannah Howard's avatar
Hannah Howard committed
544
			blkIndex := 0
545 546
			blockCount := 3
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
Hannah Howard's avatar
Hannah Howard committed
547 548
				blkIndex++
				if blkIndex == blockCount {
549 550
					hookActions.PauseResponse()
				}
Hannah Howard's avatar
Hannah Howard committed
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			timer := time.NewTimer(500 * time.Millisecond)
			testutil.AssertDoesReceiveFirst(t, timer.C, "should not complete request while paused", td.completedRequestChan)
			for i := 0; i < blockCount; i++ {
				testutil.AssertDoesReceive(td.ctx, t, td.sentResponses, "should sent block")
			}
			testutil.AssertChannelEmpty(t, td.sentResponses, "should not send more blocks")
			var pausedRequest pausedRequest
			testutil.AssertReceive(td.ctx, t, td.pausedRequests, &pausedRequest, "should pause request")
			err := responseManager.UnpauseResponse(td.p, td.requestID)
			require.NoError(t, err)
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
			require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		})

	})

	t.Run("test update hook processing", func(t *testing.T) {

		t.Run("can pause/unpause", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
575
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
Hannah Howard's avatar
Hannah Howard committed
576 577 578 579 580 581 582
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
			blkIndex := 0
			blockCount := 3
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
583
				blkIndex++
Hannah Howard's avatar
Hannah Howard committed
584 585 586 587 588 589 590 591
				if blkIndex == blockCount {
					hookActions.PauseResponse()
				}
			})
			td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
				if _, found := updateData.Extension(td.extensionName); found {
					hookActions.UnpauseResponse()
				}
592 593 594 595 596
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			timer := time.NewTimer(500 * time.Millisecond)
			testutil.AssertDoesReceiveFirst(t, timer.C, "should not complete request while paused", td.completedRequestChan)
			var sentResponses []sentResponse
Hannah Howard's avatar
Hannah Howard committed
597 598
			for i := 0; i < blockCount; i++ {
				testutil.AssertDoesReceive(td.ctx, t, td.sentResponses, "should sent block")
599
			}
Hannah Howard's avatar
Hannah Howard committed
600 601 602
			testutil.AssertChannelEmpty(t, td.sentResponses, "should not send more blocks")
			var pausedRequest pausedRequest
			testutil.AssertReceive(td.ctx, t, td.pausedRequests, &pausedRequest, "should pause request")
603
			require.LessOrEqual(t, len(sentResponses), blockCount)
Hannah Howard's avatar
Hannah Howard committed
604
			responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
605 606 607 608 609
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
			require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		})

Hannah Howard's avatar
Hannah Howard committed
610 611 612 613
		t.Run("can send extension data", func(t *testing.T) {
			t.Run("when unpaused", func(t *testing.T) {
				td := newTestData(t)
				defer td.cancel()
614
				responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
Hannah Howard's avatar
Hannah Howard committed
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650
				responseManager.Startup()
				td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
					hookActions.ValidateRequest()
				})
				blkIndex := 0
				blockCount := 3
				wait := make(chan struct{})
				sent := make(chan struct{})
				td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
					blkIndex++
					if blkIndex == blockCount {
						close(sent)
						<-wait
					}
				})
				td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
					if _, found := updateData.Extension(td.extensionName); found {
						hookActions.SendExtensionData(td.extensionResponse)
					}
				})
				responseManager.ProcessRequests(td.ctx, td.p, td.requests)
				testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
				responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
				responseManager.synchronize()
				close(wait)
				var lastRequest completedRequest
				testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
				require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
				var receivedExtension sentExtension
				testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
				require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
			})

			t.Run("when paused", func(t *testing.T) {
				td := newTestData(t)
				defer td.cancel()
651
				responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
Hannah Howard's avatar
Hannah Howard committed
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695
				responseManager.Startup()
				td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
					hookActions.ValidateRequest()
				})
				blkIndex := 0
				blockCount := 3
				td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
					blkIndex++
					if blkIndex == blockCount {
						hookActions.PauseResponse()
					}
				})
				td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
					if _, found := updateData.Extension(td.extensionName); found {
						hookActions.SendExtensionData(td.extensionResponse)
					}
				})
				responseManager.ProcessRequests(td.ctx, td.p, td.requests)
				var sentResponses []sentResponse
				for i := 0; i < blockCount; i++ {
					testutil.AssertDoesReceive(td.ctx, t, td.sentResponses, "should sent block")
				}
				testutil.AssertChannelEmpty(t, td.sentResponses, "should not send more blocks")
				var pausedRequest pausedRequest
				testutil.AssertReceive(td.ctx, t, td.pausedRequests, &pausedRequest, "should pause request")
				require.LessOrEqual(t, len(sentResponses), blockCount)

				// send update
				responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)

				// receive data
				var receivedExtension sentExtension
				testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")

				// should still be paused
				timer := time.NewTimer(500 * time.Millisecond)
				testutil.AssertDoesReceiveFirst(t, timer.C, "should not complete request while paused", td.completedRequestChan)
			})
		})

		t.Run("can send errors", func(t *testing.T) {
			t.Run("when unpaused", func(t *testing.T) {
				td := newTestData(t)
				defer td.cancel()
696
				responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
Hannah Howard's avatar
Hannah Howard committed
697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
				responseManager.Startup()
				td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
					hookActions.ValidateRequest()
				})
				blkIndex := 0
				blockCount := 3
				wait := make(chan struct{})
				sent := make(chan struct{})
				td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
					blkIndex++
					if blkIndex == blockCount {
						close(sent)
						<-wait
					}
				})
				td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
					if _, found := updateData.Extension(td.extensionName); found {
						hookActions.TerminateWithError(errors.New("something went wrong"))
					}
				})
				responseManager.ProcessRequests(td.ctx, td.p, td.requests)
				testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
				responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
				responseManager.synchronize()
				close(wait)
				var lastRequest completedRequest
				testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
				require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "request should fail")
			})

			t.Run("when paused", func(t *testing.T) {
				td := newTestData(t)
				defer td.cancel()
730
				responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
Hannah Howard's avatar
Hannah Howard committed
731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771
				responseManager.Startup()
				td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
					hookActions.ValidateRequest()
				})
				blkIndex := 0
				blockCount := 3
				td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
					blkIndex++
					if blkIndex == blockCount {
						hookActions.PauseResponse()
					}
				})
				td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
					if _, found := updateData.Extension(td.extensionName); found {
						hookActions.TerminateWithError(errors.New("something went wrong"))
					}
				})
				responseManager.ProcessRequests(td.ctx, td.p, td.requests)
				var sentResponses []sentResponse
				for i := 0; i < blockCount; i++ {
					testutil.AssertDoesReceive(td.ctx, t, td.sentResponses, "should sent block")
				}
				testutil.AssertChannelEmpty(t, td.sentResponses, "should not send more blocks")
				var pausedRequest pausedRequest
				testutil.AssertReceive(td.ctx, t, td.pausedRequests, &pausedRequest, "should pause request")
				require.LessOrEqual(t, len(sentResponses), blockCount)

				// send update
				responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)

				// should terminate
				var lastRequest completedRequest
				testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
				require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "request should fail")

				// cannot unpause
				err := responseManager.UnpauseResponse(td.p, td.requestID)
				require.Error(t, err)
			})
		})

772
	})
773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795
	t.Run("final response status listeners", func(t *testing.T) {
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
		responseManager.Startup()
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			hookActions.ValidateRequest()
		})
		statusChan := make(chan graphsync.ResponseStatusCode, 1)
		td.completedListeners.Register(func(p peer.ID, requestData graphsync.RequestData, status graphsync.ResponseStatusCode) {
			select {
			case statusChan <- status:
			default:
			}
		})
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		var lastRequest completedRequest
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var status graphsync.ResponseStatusCode
		testutil.AssertReceive(td.ctx, t, statusChan, &status, "should receive status")
		require.True(t, gsmsg.IsTerminalSuccessCode(status), "request should succeed")
	})
796 797 798 799 800 801 802 803 804 805 806 807 808
}

type testData struct {
	ctx                   context.Context
	cancel                func()
	blockStore            map[ipld.Link][]byte
	loader                ipld.Loader
	storer                ipld.Storer
	blockChainLength      int
	blockChain            *testutil.TestBlockChain
	completedRequestChan  chan completedRequest
	sentResponses         chan sentResponse
	sentExtensions        chan sentExtension
Hannah Howard's avatar
Hannah Howard committed
809
	pausedRequests        chan pausedRequest
810
	ignoredLinks          chan []ipld.Link
811 812 813 814 815 816 817
	peerManager           *fakePeerManager
	queryQueue            *fakeQueryQueue
	extensionData         []byte
	extensionName         graphsync.ExtensionName
	extension             graphsync.ExtensionData
	extensionResponseData []byte
	extensionResponse     graphsync.ExtensionData
Hannah Howard's avatar
Hannah Howard committed
818 819
	extensionUpdateData   []byte
	extensionUpdate       graphsync.ExtensionData
820 821
	requestID             graphsync.RequestID
	requests              []gsmsg.GraphSyncRequest
Hannah Howard's avatar
Hannah Howard committed
822
	updateRequests        []gsmsg.GraphSyncRequest
823 824
	p                     peer.ID
	peristenceOptions     *persistenceoptions.PersistenceOptions
Hannah Howard's avatar
Hannah Howard committed
825 826 827
	requestHooks          *hooks.IncomingRequestHooks
	blockHooks            *hooks.OutgoingBlockHooks
	updateHooks           *hooks.RequestUpdatedHooks
828
	completedListeners    *hooks.CompletedResponseListeners
829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
}

func newTestData(t *testing.T) testData {
	ctx := context.Background()
	td := testData{}
	td.ctx, td.cancel = context.WithTimeout(ctx, 10*time.Second)

	td.blockStore = make(map[ipld.Link][]byte)
	td.loader, td.storer = testutil.NewTestStore(td.blockStore)
	td.blockChainLength = 5
	td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, td.blockChainLength)

	td.completedRequestChan = make(chan completedRequest, 1)
	td.sentResponses = make(chan sentResponse, td.blockChainLength*2)
	td.sentExtensions = make(chan sentExtension, td.blockChainLength*2)
Hannah Howard's avatar
Hannah Howard committed
844
	td.pausedRequests = make(chan pausedRequest, 1)
845 846 847 848 849 850 851 852
	td.ignoredLinks = make(chan []ipld.Link, 1)
	fprs := &fakePeerResponseSender{
		lastCompletedRequest: td.completedRequestChan,
		sentResponses:        td.sentResponses,
		sentExtensions:       td.sentExtensions,
		pausedRequests:       td.pausedRequests,
		ignoredLinks:         td.ignoredLinks,
	}
853 854 855 856 857 858 859 860 861 862 863 864 865 866
	td.peerManager = &fakePeerManager{peerResponseSender: fprs}
	td.queryQueue = &fakeQueryQueue{}

	td.extensionData = testutil.RandomBytes(100)
	td.extensionName = graphsync.ExtensionName("AppleSauce/McGee")
	td.extension = graphsync.ExtensionData{
		Name: td.extensionName,
		Data: td.extensionData,
	}
	td.extensionResponseData = testutil.RandomBytes(100)
	td.extensionResponse = graphsync.ExtensionData{
		Name: td.extensionName,
		Data: td.extensionResponseData,
	}
Hannah Howard's avatar
Hannah Howard committed
867 868 869 870 871
	td.extensionUpdateData = testutil.RandomBytes(100)
	td.extensionUpdate = graphsync.ExtensionData{
		Name: td.extensionName,
		Data: td.extensionUpdateData,
	}
872 873 874 875
	td.requestID = graphsync.RequestID(rand.Int31())
	td.requests = []gsmsg.GraphSyncRequest{
		gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0), td.extension),
	}
Hannah Howard's avatar
Hannah Howard committed
876 877 878
	td.updateRequests = []gsmsg.GraphSyncRequest{
		gsmsg.UpdateRequest(td.requestID, td.extensionUpdate),
	}
879 880
	td.p = testutil.GeneratePeers(1)[0]
	td.peristenceOptions = persistenceoptions.New()
Hannah Howard's avatar
Hannah Howard committed
881 882 883
	td.requestHooks = hooks.NewRequestHooks(td.peristenceOptions)
	td.blockHooks = hooks.NewBlockHooks()
	td.updateHooks = hooks.NewUpdateHooks()
884
	td.completedListeners = hooks.NewCompletedResponseListeners()
885
	return td
886
}