responsemanager_test.go 35.7 KB
Newer Older
1 2 3 4
package responsemanager

import (
	"context"
5
	"errors"
6 7 8 9 10
	"math/rand"
	"sync"
	"testing"
	"time"

11
	"github.com/ipfs/go-graphsync"
12
	gsmsg "github.com/ipfs/go-graphsync/message"
Hannah Howard's avatar
Hannah Howard committed
13
	"github.com/ipfs/go-graphsync/responsemanager/hooks"
14
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
15
	"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
16
	"github.com/ipfs/go-graphsync/selectorvalidator"
17
	"github.com/ipfs/go-graphsync/testutil"
18
	"github.com/ipfs/go-peertaskqueue/peertask"
19 20
	ipld "github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Hannah Howard's avatar
Hannah Howard committed
21
	basicnode "github.com/ipld/go-ipld-prime/node/basic"
22
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
23
	"github.com/stretchr/testify/require"
24 25 26 27 28
)

type fakeQueryQueue struct {
	popWait   sync.WaitGroup
	queriesLk sync.RWMutex
29
	queries   []*peertask.QueueTask
30 31
}

32
func (fqq *fakeQueryQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
33
	fqq.queriesLk.Lock()
34 35 36 37 38 39

	// This isn't quite right as the queue should deduplicate requests, but
	// it's good enough.
	for _, task := range tasks {
		fqq.queries = append(fqq.queries, peertask.NewQueueTask(task, to, time.Now()))
	}
40 41 42
	fqq.queriesLk.Unlock()
}

43
func (fqq *fakeQueryQueue) PopTasks(targetWork int) (peer.ID, []*peertask.Task, int) {
44 45 46 47
	fqq.popWait.Wait()
	fqq.queriesLk.Lock()
	defer fqq.queriesLk.Unlock()
	if len(fqq.queries) == 0 {
48
		return "", nil, -1
49
	}
50 51
	// We're not bothering to implement "work"
	task := fqq.queries[0]
52
	fqq.queries = fqq.queries[1:]
53
	return task.Target, []*peertask.Task{&task.Task}, 0
54 55
}

56
func (fqq *fakeQueryQueue) Remove(topic peertask.Topic, p peer.ID) {
57 58 59
	fqq.queriesLk.Lock()
	defer fqq.queriesLk.Unlock()
	for i, query := range fqq.queries {
60 61
		if query.Target == p && query.Topic == topic {
			fqq.queries = append(fqq.queries[:i], fqq.queries[i+1:]...)
62 63 64 65
		}
	}
}

66 67 68 69
func (fqq *fakeQueryQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {
	// We don't track active tasks so this is a no-op
}

70 71 72
func (fqq *fakeQueryQueue) ThawRound() {

}
73 74 75 76 77 78 79 80 81 82 83 84

type fakePeerManager struct {
	lastPeer           peer.ID
	peerResponseSender peerresponsemanager.PeerResponseSender
}

func (fpm *fakePeerManager) SenderForPeer(p peer.ID) peerresponsemanager.PeerResponseSender {
	fpm.lastPeer = p
	return fpm.peerResponseSender
}

type sentResponse struct {
85
	requestID graphsync.RequestID
86 87 88 89
	link      ipld.Link
	data      []byte
}

90 91 92 93 94 95 96 97 98
type sentExtension struct {
	requestID graphsync.RequestID
	extension graphsync.ExtensionData
}

type completedRequest struct {
	requestID graphsync.RequestID
	result    graphsync.ResponseStatusCode
}
Hannah Howard's avatar
Hannah Howard committed
99 100 101 102
type pausedRequest struct {
	requestID graphsync.RequestID
}

103 104
type fakePeerResponseSender struct {
	sentResponses        chan sentResponse
105 106
	sentExtensions       chan sentExtension
	lastCompletedRequest chan completedRequest
Hannah Howard's avatar
Hannah Howard committed
107
	pausedRequests       chan pausedRequest
108 109 110 111 112
}

func (fprs *fakePeerResponseSender) Startup()  {}
func (fprs *fakePeerResponseSender) Shutdown() {}

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
type fakeBlkData struct {
	link ipld.Link
	size uint64
}

func (fbd fakeBlkData) Link() ipld.Link {
	return fbd.link
}

func (fbd fakeBlkData) BlockSize() uint64 {
	return fbd.size
}

func (fbd fakeBlkData) BlockSizeOnWire() uint64 {
	return fbd.size
}

130
func (fprs *fakePeerResponseSender) SendResponse(
131
	requestID graphsync.RequestID,
132 133
	link ipld.Link,
	data []byte,
134
) graphsync.BlockData {
135
	fprs.sentResponses <- sentResponse{requestID, link, data}
136
	return fakeBlkData{link, uint64(len(data))}
137 138
}

139 140 141 142 143 144 145
func (fprs *fakePeerResponseSender) SendExtensionData(
	requestID graphsync.RequestID,
	extension graphsync.ExtensionData,
) {
	fprs.sentExtensions <- sentExtension{requestID, extension}
}

146
func (fprs *fakePeerResponseSender) FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode {
147
	fprs.lastCompletedRequest <- completedRequest{requestID, graphsync.RequestCompletedFull}
148
	return graphsync.RequestCompletedFull
149 150
}

151
func (fprs *fakePeerResponseSender) FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) {
152
	fprs.lastCompletedRequest <- completedRequest{requestID, status}
153 154
}

Hannah Howard's avatar
Hannah Howard committed
155 156 157 158
func (fprs *fakePeerResponseSender) PauseRequest(requestID graphsync.RequestID) {
	fprs.pausedRequests <- pausedRequest{requestID}
}

Hannah Howard's avatar
Hannah Howard committed
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
func (fprs *fakePeerResponseSender) Transaction(requestID graphsync.RequestID, transaction peerresponsemanager.Transaction) error {
	fprts := &fakePeerResponseTransactionSender{requestID, fprs}
	return transaction(fprts)
}

type fakePeerResponseTransactionSender struct {
	requestID graphsync.RequestID
	prs       peerresponsemanager.PeerResponseSender
}

func (fprts *fakePeerResponseTransactionSender) SendResponse(link ipld.Link, data []byte) graphsync.BlockData {
	return fprts.prs.SendResponse(fprts.requestID, link, data)
}

func (fprts *fakePeerResponseTransactionSender) SendExtensionData(extension graphsync.ExtensionData) {
	fprts.prs.SendExtensionData(fprts.requestID, extension)
}

func (fprts *fakePeerResponseTransactionSender) FinishRequest() graphsync.ResponseStatusCode {
	return fprts.prs.FinishRequest(fprts.requestID)
}

func (fprts *fakePeerResponseTransactionSender) FinishWithError(status graphsync.ResponseStatusCode) {
	fprts.prs.FinishWithError(fprts.requestID, status)
}

func (fprts *fakePeerResponseTransactionSender) PauseRequest() {
	fprts.prs.PauseRequest(fprts.requestID)
}

189
func TestIncomingQuery(t *testing.T) {
190 191 192 193
	td := newTestData(t)
	defer td.cancel()
	blks := td.blockChain.AllBlocks()

194
	responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
195
	td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
196 197
	responseManager.Startup()

198 199
	responseManager.ProcessRequests(td.ctx, td.p, td.requests)
	testutil.AssertDoesReceive(td.ctx, t, td.completedRequestChan, "Should have completed request but didn't")
200
	for i := 0; i < len(blks); i++ {
Hannah Howard's avatar
Hannah Howard committed
201
		var sentResponse sentResponse
202
		testutil.AssertReceive(td.ctx, t, td.sentResponses, &sentResponse, "did not send responses")
Hannah Howard's avatar
Hannah Howard committed
203 204 205 206
		k := sentResponse.link.(cidlink.Link)
		blockIndex := testutil.IndexOf(blks, k.Cid)
		require.NotEqual(t, blockIndex, -1, "sent incorrect link")
		require.Equal(t, blks[blockIndex].RawData(), sentResponse.data, "sent incorrect data")
207
		require.Equal(t, td.requestID, sentResponse.requestID, "has incorrect response id")
208 209 210 211
	}
}

func TestCancellationQueryInProgress(t *testing.T) {
212 213 214
	td := newTestData(t)
	defer td.cancel()
	blks := td.blockChain.AllBlocks()
215
	responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
216
	td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
217
	responseManager.Startup()
218
	responseManager.ProcessRequests(td.ctx, td.p, td.requests)
219 220

	// read one block
Hannah Howard's avatar
Hannah Howard committed
221
	var sentResponse sentResponse
222
	testutil.AssertReceive(td.ctx, t, td.sentResponses, &sentResponse, "did not send response")
Hannah Howard's avatar
Hannah Howard committed
223 224 225 226
	k := sentResponse.link.(cidlink.Link)
	blockIndex := testutil.IndexOf(blks, k.Cid)
	require.NotEqual(t, blockIndex, -1, "sent incorrect link")
	require.Equal(t, blks[blockIndex].RawData(), sentResponse.data, "sent incorrect data")
227
	require.Equal(t, td.requestID, sentResponse.requestID, "has incorrect response id")
228 229

	// send a cancellation
230 231
	cancelRequests := []gsmsg.GraphSyncRequest{
		gsmsg.CancelRequest(td.requestID),
232
	}
233
	responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
234 235 236

	responseManager.synchronize()

237 238
	// at this point we should receive at most one more block, then traversal
	// should complete
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
	additionalBlocks := 0
	for {
		select {
		case <-td.ctx.Done():
			t.Fatal("should complete request before context closes")
		case sentResponse = <-td.sentResponses:
			k = sentResponse.link.(cidlink.Link)
			blockIndex = testutil.IndexOf(blks, k.Cid)
			require.NotEqual(t, blockIndex, -1, "did not send correct link")
			require.Equal(t, blks[blockIndex].RawData(), sentResponse.data, "sent incorrect data")
			require.Equal(t, td.requestID, sentResponse.requestID, "incorrect response id")
			additionalBlocks++
		case <-td.completedRequestChan:
			require.LessOrEqual(t, additionalBlocks, 1, "should send at most 1 additional block")
			return
		}
	}
256 257 258
}

func TestEarlyCancellation(t *testing.T) {
259 260 261
	td := newTestData(t)
	defer td.cancel()
	td.queryQueue.popWait.Add(1)
262
	responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
263
	responseManager.Startup()
264
	responseManager.ProcessRequests(td.ctx, td.p, td.requests)
265 266

	// send a cancellation
267 268
	cancelRequests := []gsmsg.GraphSyncRequest{
		gsmsg.CancelRequest(td.requestID),
269
	}
270
	responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
271 272 273 274

	responseManager.synchronize()

	// unblock popping from queue
275
	td.queryQueue.popWait.Done()
276

277
	timer := time.NewTimer(time.Second)
278
	// verify no responses processed
279
	testutil.AssertDoesReceiveFirst(t, timer.C, "should not process more responses", td.sentResponses, td.completedRequestChan)
280
}
281 282

func TestValidationAndExtensions(t *testing.T) {
283
	t.Run("on its own, should fail validation", func(t *testing.T) {
284 285
		td := newTestData(t)
		defer td.cancel()
286
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
287
		responseManager.Startup()
288
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
289
		var lastRequest completedRequest
290
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
291 292 293 294
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
	})

	t.Run("if non validating hook succeeds, does not pass validation", func(t *testing.T) {
295 296
		td := newTestData(t)
		defer td.cancel()
297
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
298
		responseManager.Startup()
299 300
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			hookActions.SendExtensionData(td.extensionResponse)
301
		})
302
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
303
		var lastRequest completedRequest
304
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
305 306
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
		var receivedExtension sentExtension
307 308
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
309
	})
310

311
	t.Run("if validating hook succeeds, should pass validation", func(t *testing.T) {
312 313
		td := newTestData(t)
		defer td.cancel()
314
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
315
		responseManager.Startup()
316
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
317
			hookActions.ValidateRequest()
318
			hookActions.SendExtensionData(td.extensionResponse)
319
		})
320
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
321
		var lastRequest completedRequest
322
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
323 324
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
325 326
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
327 328
	})

329
	t.Run("if any hook fails, should fail", func(t *testing.T) {
330 331
		td := newTestData(t)
		defer td.cancel()
332
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
333
		responseManager.Startup()
334
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
335
			hookActions.ValidateRequest()
336
		})
337 338
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			hookActions.SendExtensionData(td.extensionResponse)
339 340
			hookActions.TerminateWithError(errors.New("everything went to crap"))
		})
341
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
342
		var lastRequest completedRequest
343
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
344 345
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
		var receivedExtension sentExtension
346 347
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
348
	})
349

350
	t.Run("hooks can be unregistered", func(t *testing.T) {
351 352
		td := newTestData(t)
		defer td.cancel()
353
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
354
		responseManager.Startup()
355
		unregister := td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
356
			hookActions.ValidateRequest()
357
			hookActions.SendExtensionData(td.extensionResponse)
358
		})
359 360

		// hook validates request
361
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
362
		var lastRequest completedRequest
363
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
364 365
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
366 367
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
368 369 370 371 372

		// unregister
		unregister()

		// no same request should fail
373 374
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
375
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")
376
	})
377 378

	t.Run("hooks can alter the loader", func(t *testing.T) {
379 380
		td := newTestData(t)
		defer td.cancel()
381 382
		obs := make(map[ipld.Link][]byte)
		oloader, _ := testutil.NewTestStore(obs)
383
		responseManager := New(td.ctx, oloader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
384 385
		responseManager.Startup()
		// add validating hook -- so the request SHOULD succeed
386
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
387 388 389 390 391
			hookActions.ValidateRequest()
		})

		// request fails with base loader reading from block store that's missing data
		var lastRequest completedRequest
392 393
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
394 395
		require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure")

396
		err := td.peristenceOptions.Register("chainstore", td.loader)
397
		require.NoError(t, err)
398
		// register hook to use different loader
399 400
		_ = td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			if _, found := requestData.Extension(td.extensionName); found {
401
				hookActions.UsePersistenceOption("chainstore")
402
				hookActions.SendExtensionData(td.extensionResponse)
403 404 405
			}
		})
		// hook uses different loader that should make request succeed
406 407
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
408 409
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
410 411
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
412 413 414
	})

	t.Run("hooks can alter the node builder chooser", func(t *testing.T) {
415 416
		td := newTestData(t)
		defer td.cancel()
417
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
418 419 420
		responseManager.Startup()

		customChooserCallCount := 0
Hannah Howard's avatar
Hannah Howard committed
421
		customChooser := func(ipld.Link, ipld.LinkContext) (ipld.NodeStyle, error) {
422
			customChooserCallCount++
Hannah Howard's avatar
Hannah Howard committed
423
			return basicnode.Style.Any, nil
424 425 426
		}

		// add validating hook -- so the request SHOULD succeed
427
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
428 429 430 431 432
			hookActions.ValidateRequest()
		})

		// with default chooser, customer chooser not called
		var lastRequest completedRequest
433 434
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
435 436 437 438
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		require.Equal(t, 0, customChooserCallCount)

		// register hook to use custom chooser
439 440
		_ = td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			if _, found := requestData.Extension(td.extensionName); found {
Hannah Howard's avatar
Hannah Howard committed
441
				hookActions.UseLinkTargetNodeStyleChooser(customChooser)
442
				hookActions.SendExtensionData(td.extensionResponse)
443 444 445 446
			}
		})

		// verify now that request succeeds and uses custom chooser
447 448
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
449 450
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var receivedExtension sentExtension
451 452
		testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
		require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
453 454
		require.Equal(t, 5, customChooserCallCount)
	})
455 456 457 458 459

	t.Run("test block hook processing", func(t *testing.T) {
		t.Run("can send extension data", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
460
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
				hookActions.SendExtensionData(td.extensionResponse)
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
			require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
			for i := 0; i < td.blockChainLength; i++ {
				var receivedExtension sentExtension
				testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
				require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
			}
		})

		t.Run("can send errors", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
482
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
483 484 485 486 487 488 489 490 491 492
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
				hookActions.TerminateWithError(errors.New("failed"))
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
Hannah Howard's avatar
Hannah Howard committed
493
			require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "request should fail")
494 495 496 497 498
		})

		t.Run("can pause/unpause", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
499
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
500 501 502 503
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
Hannah Howard's avatar
Hannah Howard committed
504
			blkIndex := 0
505 506
			blockCount := 3
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
Hannah Howard's avatar
Hannah Howard committed
507 508
				blkIndex++
				if blkIndex == blockCount {
509 510
					hookActions.PauseResponse()
				}
Hannah Howard's avatar
Hannah Howard committed
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			timer := time.NewTimer(500 * time.Millisecond)
			testutil.AssertDoesReceiveFirst(t, timer.C, "should not complete request while paused", td.completedRequestChan)
			for i := 0; i < blockCount; i++ {
				testutil.AssertDoesReceive(td.ctx, t, td.sentResponses, "should sent block")
			}
			testutil.AssertChannelEmpty(t, td.sentResponses, "should not send more blocks")
			var pausedRequest pausedRequest
			testutil.AssertReceive(td.ctx, t, td.pausedRequests, &pausedRequest, "should pause request")
			err := responseManager.UnpauseResponse(td.p, td.requestID)
			require.NoError(t, err)
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
			require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		})

	})

	t.Run("test update hook processing", func(t *testing.T) {

		t.Run("can pause/unpause", func(t *testing.T) {
			td := newTestData(t)
			defer td.cancel()
535
			responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
Hannah Howard's avatar
Hannah Howard committed
536 537 538 539 540 541 542
			responseManager.Startup()
			td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
				hookActions.ValidateRequest()
			})
			blkIndex := 0
			blockCount := 3
			td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
543
				blkIndex++
Hannah Howard's avatar
Hannah Howard committed
544 545 546 547 548 549 550 551
				if blkIndex == blockCount {
					hookActions.PauseResponse()
				}
			})
			td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
				if _, found := updateData.Extension(td.extensionName); found {
					hookActions.UnpauseResponse()
				}
552 553 554 555 556
			})
			responseManager.ProcessRequests(td.ctx, td.p, td.requests)
			timer := time.NewTimer(500 * time.Millisecond)
			testutil.AssertDoesReceiveFirst(t, timer.C, "should not complete request while paused", td.completedRequestChan)
			var sentResponses []sentResponse
Hannah Howard's avatar
Hannah Howard committed
557 558
			for i := 0; i < blockCount; i++ {
				testutil.AssertDoesReceive(td.ctx, t, td.sentResponses, "should sent block")
559
			}
Hannah Howard's avatar
Hannah Howard committed
560 561 562
			testutil.AssertChannelEmpty(t, td.sentResponses, "should not send more blocks")
			var pausedRequest pausedRequest
			testutil.AssertReceive(td.ctx, t, td.pausedRequests, &pausedRequest, "should pause request")
563
			require.LessOrEqual(t, len(sentResponses), blockCount)
Hannah Howard's avatar
Hannah Howard committed
564
			responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
565 566 567 568 569
			var lastRequest completedRequest
			testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
			require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		})

Hannah Howard's avatar
Hannah Howard committed
570 571 572 573
		t.Run("can send extension data", func(t *testing.T) {
			t.Run("when unpaused", func(t *testing.T) {
				td := newTestData(t)
				defer td.cancel()
574
				responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
Hannah Howard's avatar
Hannah Howard committed
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610
				responseManager.Startup()
				td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
					hookActions.ValidateRequest()
				})
				blkIndex := 0
				blockCount := 3
				wait := make(chan struct{})
				sent := make(chan struct{})
				td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
					blkIndex++
					if blkIndex == blockCount {
						close(sent)
						<-wait
					}
				})
				td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
					if _, found := updateData.Extension(td.extensionName); found {
						hookActions.SendExtensionData(td.extensionResponse)
					}
				})
				responseManager.ProcessRequests(td.ctx, td.p, td.requests)
				testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
				responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
				responseManager.synchronize()
				close(wait)
				var lastRequest completedRequest
				testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
				require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
				var receivedExtension sentExtension
				testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
				require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
			})

			t.Run("when paused", func(t *testing.T) {
				td := newTestData(t)
				defer td.cancel()
611
				responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
Hannah Howard's avatar
Hannah Howard committed
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655
				responseManager.Startup()
				td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
					hookActions.ValidateRequest()
				})
				blkIndex := 0
				blockCount := 3
				td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
					blkIndex++
					if blkIndex == blockCount {
						hookActions.PauseResponse()
					}
				})
				td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
					if _, found := updateData.Extension(td.extensionName); found {
						hookActions.SendExtensionData(td.extensionResponse)
					}
				})
				responseManager.ProcessRequests(td.ctx, td.p, td.requests)
				var sentResponses []sentResponse
				for i := 0; i < blockCount; i++ {
					testutil.AssertDoesReceive(td.ctx, t, td.sentResponses, "should sent block")
				}
				testutil.AssertChannelEmpty(t, td.sentResponses, "should not send more blocks")
				var pausedRequest pausedRequest
				testutil.AssertReceive(td.ctx, t, td.pausedRequests, &pausedRequest, "should pause request")
				require.LessOrEqual(t, len(sentResponses), blockCount)

				// send update
				responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)

				// receive data
				var receivedExtension sentExtension
				testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")

				// should still be paused
				timer := time.NewTimer(500 * time.Millisecond)
				testutil.AssertDoesReceiveFirst(t, timer.C, "should not complete request while paused", td.completedRequestChan)
			})
		})

		t.Run("can send errors", func(t *testing.T) {
			t.Run("when unpaused", func(t *testing.T) {
				td := newTestData(t)
				defer td.cancel()
656
				responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
Hannah Howard's avatar
Hannah Howard committed
657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689
				responseManager.Startup()
				td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
					hookActions.ValidateRequest()
				})
				blkIndex := 0
				blockCount := 3
				wait := make(chan struct{})
				sent := make(chan struct{})
				td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
					blkIndex++
					if blkIndex == blockCount {
						close(sent)
						<-wait
					}
				})
				td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
					if _, found := updateData.Extension(td.extensionName); found {
						hookActions.TerminateWithError(errors.New("something went wrong"))
					}
				})
				responseManager.ProcessRequests(td.ctx, td.p, td.requests)
				testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
				responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
				responseManager.synchronize()
				close(wait)
				var lastRequest completedRequest
				testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
				require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "request should fail")
			})

			t.Run("when paused", func(t *testing.T) {
				td := newTestData(t)
				defer td.cancel()
690
				responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
Hannah Howard's avatar
Hannah Howard committed
691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731
				responseManager.Startup()
				td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
					hookActions.ValidateRequest()
				})
				blkIndex := 0
				blockCount := 3
				td.blockHooks.Register(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
					blkIndex++
					if blkIndex == blockCount {
						hookActions.PauseResponse()
					}
				})
				td.updateHooks.Register(func(p peer.ID, requestData graphsync.RequestData, updateData graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
					if _, found := updateData.Extension(td.extensionName); found {
						hookActions.TerminateWithError(errors.New("something went wrong"))
					}
				})
				responseManager.ProcessRequests(td.ctx, td.p, td.requests)
				var sentResponses []sentResponse
				for i := 0; i < blockCount; i++ {
					testutil.AssertDoesReceive(td.ctx, t, td.sentResponses, "should sent block")
				}
				testutil.AssertChannelEmpty(t, td.sentResponses, "should not send more blocks")
				var pausedRequest pausedRequest
				testutil.AssertReceive(td.ctx, t, td.pausedRequests, &pausedRequest, "should pause request")
				require.LessOrEqual(t, len(sentResponses), blockCount)

				// send update
				responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)

				// should terminate
				var lastRequest completedRequest
				testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
				require.True(t, gsmsg.IsTerminalFailureCode(lastRequest.result), "request should fail")

				// cannot unpause
				err := responseManager.UnpauseResponse(td.p, td.requestID)
				require.Error(t, err)
			})
		})

732
	})
733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755
	t.Run("final response status listeners", func(t *testing.T) {
		td := newTestData(t)
		defer td.cancel()
		responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
		responseManager.Startup()
		td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
			hookActions.ValidateRequest()
		})
		statusChan := make(chan graphsync.ResponseStatusCode, 1)
		td.completedListeners.Register(func(p peer.ID, requestData graphsync.RequestData, status graphsync.ResponseStatusCode) {
			select {
			case statusChan <- status:
			default:
			}
		})
		responseManager.ProcessRequests(td.ctx, td.p, td.requests)
		var lastRequest completedRequest
		testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
		require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
		var status graphsync.ResponseStatusCode
		testutil.AssertReceive(td.ctx, t, statusChan, &status, "should receive status")
		require.True(t, gsmsg.IsTerminalSuccessCode(status), "request should succeed")
	})
756 757 758 759 760 761 762 763 764 765 766 767 768
}

type testData struct {
	ctx                   context.Context
	cancel                func()
	blockStore            map[ipld.Link][]byte
	loader                ipld.Loader
	storer                ipld.Storer
	blockChainLength      int
	blockChain            *testutil.TestBlockChain
	completedRequestChan  chan completedRequest
	sentResponses         chan sentResponse
	sentExtensions        chan sentExtension
Hannah Howard's avatar
Hannah Howard committed
769
	pausedRequests        chan pausedRequest
770 771 772 773 774 775 776
	peerManager           *fakePeerManager
	queryQueue            *fakeQueryQueue
	extensionData         []byte
	extensionName         graphsync.ExtensionName
	extension             graphsync.ExtensionData
	extensionResponseData []byte
	extensionResponse     graphsync.ExtensionData
Hannah Howard's avatar
Hannah Howard committed
777 778
	extensionUpdateData   []byte
	extensionUpdate       graphsync.ExtensionData
779 780
	requestID             graphsync.RequestID
	requests              []gsmsg.GraphSyncRequest
Hannah Howard's avatar
Hannah Howard committed
781
	updateRequests        []gsmsg.GraphSyncRequest
782 783
	p                     peer.ID
	peristenceOptions     *persistenceoptions.PersistenceOptions
Hannah Howard's avatar
Hannah Howard committed
784 785 786
	requestHooks          *hooks.IncomingRequestHooks
	blockHooks            *hooks.OutgoingBlockHooks
	updateHooks           *hooks.RequestUpdatedHooks
787
	completedListeners    *hooks.CompletedResponseListeners
788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
}

func newTestData(t *testing.T) testData {
	ctx := context.Background()
	td := testData{}
	td.ctx, td.cancel = context.WithTimeout(ctx, 10*time.Second)

	td.blockStore = make(map[ipld.Link][]byte)
	td.loader, td.storer = testutil.NewTestStore(td.blockStore)
	td.blockChainLength = 5
	td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, td.blockChainLength)

	td.completedRequestChan = make(chan completedRequest, 1)
	td.sentResponses = make(chan sentResponse, td.blockChainLength*2)
	td.sentExtensions = make(chan sentExtension, td.blockChainLength*2)
Hannah Howard's avatar
Hannah Howard committed
803 804
	td.pausedRequests = make(chan pausedRequest, 1)
	fprs := &fakePeerResponseSender{lastCompletedRequest: td.completedRequestChan, sentResponses: td.sentResponses, sentExtensions: td.sentExtensions, pausedRequests: td.pausedRequests}
805 806 807 808 809 810 811 812 813 814 815 816 817 818
	td.peerManager = &fakePeerManager{peerResponseSender: fprs}
	td.queryQueue = &fakeQueryQueue{}

	td.extensionData = testutil.RandomBytes(100)
	td.extensionName = graphsync.ExtensionName("AppleSauce/McGee")
	td.extension = graphsync.ExtensionData{
		Name: td.extensionName,
		Data: td.extensionData,
	}
	td.extensionResponseData = testutil.RandomBytes(100)
	td.extensionResponse = graphsync.ExtensionData{
		Name: td.extensionName,
		Data: td.extensionResponseData,
	}
Hannah Howard's avatar
Hannah Howard committed
819 820 821 822 823
	td.extensionUpdateData = testutil.RandomBytes(100)
	td.extensionUpdate = graphsync.ExtensionData{
		Name: td.extensionName,
		Data: td.extensionUpdateData,
	}
824 825 826 827
	td.requestID = graphsync.RequestID(rand.Int31())
	td.requests = []gsmsg.GraphSyncRequest{
		gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0), td.extension),
	}
Hannah Howard's avatar
Hannah Howard committed
828 829 830
	td.updateRequests = []gsmsg.GraphSyncRequest{
		gsmsg.UpdateRequest(td.requestID, td.extensionUpdate),
	}
831 832
	td.p = testutil.GeneratePeers(1)[0]
	td.peristenceOptions = persistenceoptions.New()
Hannah Howard's avatar
Hannah Howard committed
833 834 835
	td.requestHooks = hooks.NewRequestHooks(td.peristenceOptions)
	td.blockHooks = hooks.NewBlockHooks()
	td.updateHooks = hooks.NewUpdateHooks()
836
	td.completedListeners = hooks.NewCompletedResponseListeners()
837
	return td
838
}