requestmanager_test.go 25.5 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6 7
	"fmt"
	"sync"
8 9 10
	"testing"
	"time"

11
	"github.com/ipfs/go-graphsync"
12
	"github.com/ipfs/go-graphsync/requestmanager/types"
13
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
14
	"github.com/stretchr/testify/require"
15 16 17

	"github.com/ipfs/go-graphsync/metadata"

18
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
19 20 21

	"github.com/ipld/go-ipld-prime"

22 23 24 25 26 27
	blocks "github.com/ipfs/go-block-format"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
28 29
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
30
}
31

32 33 34 35
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

36 37
func (fph *fakePeerHandler) SendRequest(p peer.ID,
	graphSyncRequest gsmsg.GraphSyncRequest) {
38
	fph.requestRecordChan <- requestRecord{
39 40
		gsr: graphSyncRequest,
		p:   p,
41 42 43
	}
}

44
type requestKey struct {
45
	requestID graphsync.RequestID
46 47 48
	link      ipld.Link
}

49 50 51 52 53
type storeKey struct {
	requestID graphsync.RequestID
	storeName string
}

54 55 56
type fakeAsyncLoader struct {
	responseChannelsLk sync.RWMutex
	responseChannels   map[requestKey]chan types.AsyncLoadResult
57
	responses          chan map[graphsync.RequestID]metadata.Metadata
58
	blks               chan []blocks.Block
59 60
	storesRequestedLk  sync.RWMutex
	storesRequested    map[storeKey]struct{}
61 62 63 64 65
}

func newFakeAsyncLoader() *fakeAsyncLoader {
	return &fakeAsyncLoader{
		responseChannels: make(map[requestKey]chan types.AsyncLoadResult),
66
		responses:        make(chan map[graphsync.RequestID]metadata.Metadata, 1),
67
		blks:             make(chan []blocks.Block, 1),
68
		storesRequested:  make(map[storeKey]struct{}),
69 70
	}
}
71 72 73 74 75 76

func (fal *fakeAsyncLoader) StartRequest(requestID graphsync.RequestID, name string) error {
	fal.storesRequestedLk.Lock()
	fal.storesRequested[storeKey{requestID, name}] = struct{}{}
	fal.storesRequestedLk.Unlock()
	return nil
77
}
78

79
func (fal *fakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
80 81 82 83 84
	blks []blocks.Block) {
	fal.responses <- responses
	fal.blks <- blks
}
func (fal *fakeAsyncLoader) verifyLastProcessedBlocks(ctx context.Context, t *testing.T, expectedBlocks []blocks.Block) {
Hannah Howard's avatar
Hannah Howard committed
85 86 87
	var processedBlocks []blocks.Block
	testutil.AssertReceive(ctx, t, fal.blks, &processedBlocks, "did not process blocks")
	require.Equal(t, expectedBlocks, processedBlocks, "did not process correct blocks")
88
}
Hannah Howard's avatar
Hannah Howard committed
89

90
func (fal *fakeAsyncLoader) verifyLastProcessedResponses(ctx context.Context, t *testing.T,
91
	expectedResponses map[graphsync.RequestID]metadata.Metadata) {
Hannah Howard's avatar
Hannah Howard committed
92 93 94
	var responses map[graphsync.RequestID]metadata.Metadata
	testutil.AssertReceive(ctx, t, fal.responses, &responses, "did not process responses")
	require.Equal(t, expectedResponses, responses, "did not process correct responses")
95 96
}

97
func (fal *fakeAsyncLoader) verifyNoRemainingData(t *testing.T, requestID graphsync.RequestID) {
98 99
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
Hannah Howard's avatar
Hannah Howard committed
100
		require.NotEqual(t, key.requestID, requestID, "did not clean up request properly")
101 102 103 104
	}
	fal.responseChannelsLk.Unlock()
}

105 106 107 108 109 110 111
func (fal *fakeAsyncLoader) verifyStoreUsed(t *testing.T, requestID graphsync.RequestID, storeName string) {
	fal.storesRequestedLk.RLock()
	_, ok := fal.storesRequested[storeKey{requestID, storeName}]
	require.True(t, ok, "request should load from correct store")
	fal.storesRequestedLk.RUnlock()
}

112
func (fal *fakeAsyncLoader) asyncLoad(requestID graphsync.RequestID, link ipld.Link) chan types.AsyncLoadResult {
113 114 115 116 117 118 119 120 121 122
	fal.responseChannelsLk.Lock()
	responseChannel, ok := fal.responseChannels[requestKey{requestID, link}]
	if !ok {
		responseChannel = make(chan types.AsyncLoadResult, 1)
		fal.responseChannels[requestKey{requestID, link}] = responseChannel
	}
	fal.responseChannelsLk.Unlock()
	return responseChannel
}

123
func (fal *fakeAsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
124 125
	return fal.asyncLoad(requestID, link)
}
126 127
func (fal *fakeAsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {}
func (fal *fakeAsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
128 129 130 131 132 133 134 135 136
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
		if key.requestID == requestID {
			delete(fal.responseChannels, key)
		}
	}
	fal.responseChannelsLk.Unlock()
}

137
func (fal *fakeAsyncLoader) responseOn(requestID graphsync.RequestID, link ipld.Link, result types.AsyncLoadResult) {
138 139 140 141 142
	responseChannel := fal.asyncLoad(requestID, link)
	responseChannel <- result
	close(responseChannel)
}

143
func (fal *fakeAsyncLoader) successResponseOn(requestID graphsync.RequestID, blks []blocks.Block) {
144 145 146 147 148
	for _, block := range blks {
		fal.responseOn(requestID, cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: block.RawData(), Err: nil})
	}
}

149 150 151 152 153 154
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
Hannah Howard's avatar
Hannah Howard committed
155 156 157
		var rr requestRecord
		testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
		requestRecords = append(requestRecords, rr)
158 159 160 161
	}
	return requestRecords
}

162 163 164 165 166 167 168 169 170 171 172
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
			Link:         cidlink.Link{Cid: block.Cid()},
			BlockPresent: present,
		})
	}
	return md
}

173
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
174
	md := metadataForBlocks(blks, present)
175
	metadataEncoded, err := metadata.EncodeMetadata(md)
Hannah Howard's avatar
Hannah Howard committed
176
	require.NoError(t, err, "did not encode metadata")
177 178
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
179 180
		Data: metadataEncoded,
	}
181 182
}

183 184 185 186
func TestNormalSimultaneousFetch(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
187
	fal := newFakeAsyncLoader()
188
	requestManager := New(ctx, fal)
189 190 191 192 193
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
194
	peers := testutil.GeneratePeers(1)
195

196
	blockStore := make(map[ipld.Link][]byte)
197
	loader, storer := testutil.NewTestStore(blockStore)
198 199
	blockChain1 := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	blockChain2 := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
200

201 202
	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx, peers[0], blockChain1.TipLink, blockChain1.Selector())
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
203

204
	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
205

Hannah Howard's avatar
Hannah Howard committed
206 207 208 209 210 211
	require.Equal(t, peers[0], requestRecords[0].p)
	require.Equal(t, peers[0], requestRecords[1].p)
	require.False(t, requestRecords[0].gsr.IsCancel())
	require.False(t, requestRecords[1].gsr.IsCancel())
	require.Equal(t, maxPriority, requestRecords[0].gsr.Priority())
	require.Equal(t, maxPriority, requestRecords[1].gsr.Priority())
212

Hannah Howard's avatar
Hannah Howard committed
213 214
	require.Equal(t, blockChain1.Selector(), requestRecords[0].gsr.Selector(), "did not encode selector properly")
	require.Equal(t, blockChain2.Selector(), requestRecords[1].gsr.Selector(), "did not encode selector properly")
215

216 217
	firstBlocks := append(blockChain1.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(blockChain1.AllBlocks(), true)
218
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
Hannah Howard's avatar
Hannah Howard committed
219
	require.NoError(t, err, "did not encode metadata")
220
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
221
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
Hannah Howard's avatar
Hannah Howard committed
222
	require.NoError(t, err, "did not encode metadata")
223
	firstResponses := []gsmsg.GraphSyncResponse{
224 225
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
226 227
			Data: firstMetadataEncoded1,
		}),
228 229
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
230 231
			Data: firstMetadataEncoded2,
		}),
232 233
	}

234 235
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	fal.verifyLastProcessedBlocks(ctx, t, firstBlocks)
236
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
237 238 239
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
240 241
	fal.successResponseOn(requestRecords[0].gsr.ID(), blockChain1.AllBlocks())
	fal.successResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
242

243 244
	blockChain1.VerifyWholeChain(requestCtx, returnedResponseChan1)
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
245

246
	moreBlocks := blockChain2.RemainderBlocks(3)
247
	moreMetadata := metadataForBlocks(moreBlocks, true)
248
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
Hannah Howard's avatar
Hannah Howard committed
249
	require.NoError(t, err, "did not encode metadata")
250
	moreResponses := []gsmsg.GraphSyncResponse{
251 252
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
253 254
			Data: moreMetadataEncoded,
		}),
255 256
	}

257 258
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.verifyLastProcessedBlocks(ctx, t, moreBlocks)
259
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
260 261
		requestRecords[1].gsr.ID(): moreMetadata,
	})
262

263 264
	fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)

265
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
266 267
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
268 269 270 271 272 273
}

func TestCancelRequestInProgress(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
274
	fal := newFakeAsyncLoader()
275
	requestManager := New(ctx, fal)
276 277 278 279 280 281 282
	requestManager.SetDelegate(fph)
	requestManager.Startup()
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
283
	peers := testutil.GeneratePeers(1)
284

285
	blockStore := make(map[ipld.Link][]byte)
286
	loader, storer := testutil.NewTestStore(blockStore)
287
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
288

289 290
	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx1, peers[0], blockChain.TipLink, blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx2, peers[0], blockChain.TipLink, blockChain.Selector())
291

292
	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
293

294 295
	firstBlocks := blockChain.Blocks(0, 3)
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
296
	firstResponses := []gsmsg.GraphSyncResponse{
297 298
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
299 300
	}

301
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
302

303 304 305
	fal.successResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	fal.successResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
	blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
306
	cancel1()
307
	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
Hannah Howard's avatar
Hannah Howard committed
308 309 310

	require.True(t, rr.gsr.IsCancel())
	require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
311

312 313
	moreBlocks := blockChain.RemainderBlocks(3)
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
314
	moreResponses := []gsmsg.GraphSyncResponse{
315 316
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
317
	}
318
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
319 320
	fal.successResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
321

322 323
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
324 325
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
326 327 328 329 330 331 332
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
333
	fal := newFakeAsyncLoader()
334
	requestManager := New(managerCtx, fal)
335 336 337 338
	requestManager.SetDelegate(fph)
	requestManager.Startup()
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
339
	peers := testutil.GeneratePeers(1)
340

341
	blockStore := make(map[ipld.Link][]byte)
342
	loader, storer := testutil.NewTestStore(blockStore)
343
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
344

345
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
346

347
	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
348

349
	firstBlocks := blockChain.Blocks(0, 3)
350
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
351
	firstResponses := []gsmsg.GraphSyncResponse{
352
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
353
	}
354 355
	requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	fal.successResponseOn(rr.gsr.ID(), firstBlocks)
356
	blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
357 358
	managerCancel()

359
	moreBlocks := blockChain.RemainderBlocks(3)
360
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
361
	moreResponses := []gsmsg.GraphSyncResponse{
362
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
363
	}
364 365
	requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	fal.successResponseOn(rr.gsr.ID(), moreBlocks)
366
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
367
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
368 369 370 371 372 373
}

func TestUnencodableSelector(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
374
	fal := newFakeAsyncLoader()
375
	requestManager := New(ctx, fal)
376 377 378 379 380 381 382
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

383 384
	s := testutil.NewUnencodableSelectorSpec()
	r := cidlink.Link{Cid: testutil.GenerateCids(1)[0]}
385
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
386

387 388
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
389
}
390 391 392 393 394

func TestFailedRequest(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
395
	fal := newFakeAsyncLoader()
396
	requestManager := New(ctx, fal)
397 398 399 400 401
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
402
	peers := testutil.GeneratePeers(1)
403

404
	blockStore := make(map[ipld.Link][]byte)
405
	loader, storer := testutil.NewTestStore(blockStore)
406 407 408
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)

	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
409 410

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
411
	failedResponses := []gsmsg.GraphSyncResponse{
412
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
413
	}
414
	requestManager.ProcessResponses(peers[0], failedResponses, nil)
415

416 417
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
418 419 420 421 422 423 424
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
425
	requestManager := New(ctx, fal)
426 427 428 429 430 431 432
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

433
	blockStore := make(map[ipld.Link][]byte)
434
	loader, storer := testutil.NewTestStore(blockStore)
435 436 437
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)

	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
438 439 440 441

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	// async loaded response responds immediately
442
	fal.successResponseOn(rr.gsr.ID(), blockChain.AllBlocks())
443

444
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
445 446 447

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
448
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
449 450 451
	}

	requestManager.ProcessResponses(peers[0], failedResponses, nil)
452
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
453 454 455 456 457 458 459 460

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
461
	requestManager := New(ctx, fal)
462 463 464 465 466 467 468
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

469
	blockStore := make(map[ipld.Link][]byte)
470
	loader, storer := testutil.NewTestStore(blockStore)
471 472
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
473 474 475 476

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	// async loaded response responds immediately
477
	fal.successResponseOn(rr.gsr.ID(), blockChain.AllBlocks())
478

479
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
480

481
	md := encodedMetadataForBlocks(t, blockChain.AllBlocks(), true)
482
	firstResponses := []gsmsg.GraphSyncResponse{
483
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
484
	}
485
	requestManager.ProcessResponses(peers[0], firstResponses, blockChain.AllBlocks())
486 487

	fal.verifyNoRemainingData(t, rr.gsr.ID())
488
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
489 490 491 492 493 494 495
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
496
	requestManager := New(ctx, fal)
497 498 499 500 501 502 503
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

504
	blockStore := make(map[ipld.Link][]byte)
505
	loader, storer := testutil.NewTestStore(blockStore)
506 507
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
508 509 510

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

511
	md := encodedMetadataForBlocks(t, blockChain.AllBlocks(), false)
512
	firstResponses := []gsmsg.GraphSyncResponse{
513
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
514 515
	}
	requestManager.ProcessResponses(peers[0], firstResponses, nil)
516
	for _, block := range blockChain.AllBlocks() {
517 518
		fal.responseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
	}
519 520
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
521
	require.NotEqual(t, len(errs), 0, "did not send errors")
522
}
523 524 525 526 527 528

func TestEncodingExtensions(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
529
	requestManager := New(ctx, fal)
530 531 532 533 534 535 536
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

537
	blockStore := make(map[ipld.Link][]byte)
538
	loader, storer := testutil.NewTestStore(blockStore)
539
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
540 541 542 543 544 545 546 547 548 549 550 551 552

	extensionData1 := testutil.RandomBytes(100)
	extensionName1 := graphsync.ExtensionName("AppleSauce/McGee")
	extension1 := graphsync.ExtensionData{
		Name: extensionName1,
		Data: extensionData1,
	}
	extensionData2 := testutil.RandomBytes(100)
	extensionName2 := graphsync.ExtensionName("HappyLand/Happenstance")
	extension2 := graphsync.ExtensionData{
		Name: extensionName2,
		Data: extensionData2,
	}
553 554 555 556 557

	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData) error {
		data, has := responseData.Extension(extensionName1)
Hannah Howard's avatar
Hannah Howard committed
558
		require.True(t, has, "did not receive extension data in response")
559 560 561
		receivedExtensionData <- data
		return <-expectedError
	}
562
	requestManager.RegisterResponseHook(hook)
563
	returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector(), extension1, extension2)
564 565 566 567 568

	rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]

	gsr := rr.gsr
	returnedData1, found := gsr.Extension(extensionName1)
Hannah Howard's avatar
Hannah Howard committed
569 570
	require.True(t, found)
	require.Equal(t, extensionData1, returnedData1, "did not encode first extension correctly")
571 572

	returnedData2, found := gsr.Extension(extensionName2)
Hannah Howard's avatar
Hannah Howard committed
573 574
	require.True(t, found)
	require.Equal(t, extensionData2, returnedData2, "did not encode second extension correctly")
575

576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
		requestManager.ProcessResponses(peers[0], firstResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
592 593 594
		var received []byte
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, expectedData, received, "did not receive correct extension data from resposne")
595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610
		nextExpectedData := testutil.RandomBytes(100)

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
		requestManager.ProcessResponses(peers[0], secondResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
611 612
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne")
613 614 615
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
616
}
617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681

func TestOutgoingRequestHooks(t *testing.T) {
	requestRecordChan := make(chan requestRecord, 2)
	fph := &fakePeerHandler{requestRecordChan}
	ctx := context.Background()
	fal := newFakeAsyncLoader()
	requestManager := New(ctx, fal)
	requestManager.SetDelegate(fph)
	requestManager.Startup()

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testutil.NewTestStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)

	extensionName1 := graphsync.ExtensionName("blockchain")
	extension1 := graphsync.ExtensionData{
		Name: extensionName1,
		Data: nil,
	}

	hook := func(p peer.ID, r graphsync.RequestData, ha graphsync.OutgoingRequestHookActions) {
		_, has := r.Extension(extensionName1)
		if has {
			ha.UseNodeBuilderChooser(blockChain.Chooser)
			ha.UsePersistenceOption("chainstore")
		}
	}
	requestManager.RegisterRequestHook(hook)

	returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector(), extension1)
	returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())

	requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)

	md := metadataForBlocks(blockChain.AllBlocks(), true)
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	mdExt := graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
		Data: mdEncoded,
	}
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
	}
	requestManager.ProcessResponses(peers[0], responses, blockChain.AllBlocks())
	fal.verifyLastProcessedBlocks(ctx, t, blockChain.AllBlocks())
	fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
		requestRecords[0].gsr.ID(): md,
		requestRecords[1].gsr.ID(): md,
	})
	fal.successResponseOn(requestRecords[0].gsr.ID(), blockChain.AllBlocks())
	fal.successResponseOn(requestRecords[1].gsr.ID(), blockChain.AllBlocks())

	blockChain.VerifyWholeChainWithTypes(requestCtx, returnedResponseChan1)
	blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan2)
	fal.verifyStoreUsed(t, requestRecords[0].gsr.ID(), "chainstore")
	fal.verifyStoreUsed(t, requestRecords[1].gsr.ID(), "")
}