requestmanager_test.go 31.9 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6 7
	"fmt"
	"sync"
8 9 10
	"testing"
	"time"

11
	"github.com/ipfs/go-graphsync"
Hannah Howard's avatar
Hannah Howard committed
12
	"github.com/ipfs/go-graphsync/requestmanager/hooks"
13
	"github.com/ipfs/go-graphsync/requestmanager/types"
14
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
15
	"github.com/stretchr/testify/require"
16 17 18

	"github.com/ipfs/go-graphsync/metadata"

19
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
20 21 22

	"github.com/ipld/go-ipld-prime"

23 24 25 26 27 28
	blocks "github.com/ipfs/go-block-format"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testutil"
)

type requestRecord struct {
29 30
	gsr gsmsg.GraphSyncRequest
	p   peer.ID
31
}
32

33 34 35 36
type fakePeerHandler struct {
	requestRecordChan chan requestRecord
}

37 38
func (fph *fakePeerHandler) SendRequest(p peer.ID,
	graphSyncRequest gsmsg.GraphSyncRequest) {
39
	fph.requestRecordChan <- requestRecord{
40 41
		gsr: graphSyncRequest,
		p:   p,
42 43 44
	}
}

45
type requestKey struct {
46
	requestID graphsync.RequestID
47 48 49
	link      ipld.Link
}

50 51 52 53 54
type storeKey struct {
	requestID graphsync.RequestID
	storeName string
}

55 56 57
type fakeAsyncLoader struct {
	responseChannelsLk sync.RWMutex
	responseChannels   map[requestKey]chan types.AsyncLoadResult
58
	responses          chan map[graphsync.RequestID]metadata.Metadata
59
	blks               chan []blocks.Block
60 61
	storesRequestedLk  sync.RWMutex
	storesRequested    map[storeKey]struct{}
62 63 64 65 66
}

func newFakeAsyncLoader() *fakeAsyncLoader {
	return &fakeAsyncLoader{
		responseChannels: make(map[requestKey]chan types.AsyncLoadResult),
67
		responses:        make(chan map[graphsync.RequestID]metadata.Metadata, 1),
68
		blks:             make(chan []blocks.Block, 1),
69
		storesRequested:  make(map[storeKey]struct{}),
70 71
	}
}
72 73 74 75 76 77

func (fal *fakeAsyncLoader) StartRequest(requestID graphsync.RequestID, name string) error {
	fal.storesRequestedLk.Lock()
	fal.storesRequested[storeKey{requestID, name}] = struct{}{}
	fal.storesRequestedLk.Unlock()
	return nil
78
}
79

80
func (fal *fakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
81 82 83 84 85
	blks []blocks.Block) {
	fal.responses <- responses
	fal.blks <- blks
}
func (fal *fakeAsyncLoader) verifyLastProcessedBlocks(ctx context.Context, t *testing.T, expectedBlocks []blocks.Block) {
Hannah Howard's avatar
Hannah Howard committed
86 87 88
	var processedBlocks []blocks.Block
	testutil.AssertReceive(ctx, t, fal.blks, &processedBlocks, "did not process blocks")
	require.Equal(t, expectedBlocks, processedBlocks, "did not process correct blocks")
89
}
Hannah Howard's avatar
Hannah Howard committed
90

91
func (fal *fakeAsyncLoader) verifyLastProcessedResponses(ctx context.Context, t *testing.T,
92
	expectedResponses map[graphsync.RequestID]metadata.Metadata) {
Hannah Howard's avatar
Hannah Howard committed
93 94 95
	var responses map[graphsync.RequestID]metadata.Metadata
	testutil.AssertReceive(ctx, t, fal.responses, &responses, "did not process responses")
	require.Equal(t, expectedResponses, responses, "did not process correct responses")
96 97
}

98
func (fal *fakeAsyncLoader) verifyNoRemainingData(t *testing.T, requestID graphsync.RequestID) {
99 100
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
Hannah Howard's avatar
Hannah Howard committed
101
		require.NotEqual(t, key.requestID, requestID, "did not clean up request properly")
102 103 104 105
	}
	fal.responseChannelsLk.Unlock()
}

106 107 108 109 110 111 112
func (fal *fakeAsyncLoader) verifyStoreUsed(t *testing.T, requestID graphsync.RequestID, storeName string) {
	fal.storesRequestedLk.RLock()
	_, ok := fal.storesRequested[storeKey{requestID, storeName}]
	require.True(t, ok, "request should load from correct store")
	fal.storesRequestedLk.RUnlock()
}

113
func (fal *fakeAsyncLoader) asyncLoad(requestID graphsync.RequestID, link ipld.Link) chan types.AsyncLoadResult {
114 115 116 117 118 119 120 121 122 123
	fal.responseChannelsLk.Lock()
	responseChannel, ok := fal.responseChannels[requestKey{requestID, link}]
	if !ok {
		responseChannel = make(chan types.AsyncLoadResult, 1)
		fal.responseChannels[requestKey{requestID, link}] = responseChannel
	}
	fal.responseChannelsLk.Unlock()
	return responseChannel
}

124
func (fal *fakeAsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
125 126
	return fal.asyncLoad(requestID, link)
}
127 128
func (fal *fakeAsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {}
func (fal *fakeAsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
129 130 131 132 133 134 135 136 137
	fal.responseChannelsLk.Lock()
	for key := range fal.responseChannels {
		if key.requestID == requestID {
			delete(fal.responseChannels, key)
		}
	}
	fal.responseChannelsLk.Unlock()
}

138
func (fal *fakeAsyncLoader) responseOn(requestID graphsync.RequestID, link ipld.Link, result types.AsyncLoadResult) {
139 140 141 142 143
	responseChannel := fal.asyncLoad(requestID, link)
	responseChannel <- result
	close(responseChannel)
}

144
func (fal *fakeAsyncLoader) successResponseOn(requestID graphsync.RequestID, blks []blocks.Block) {
145
	for _, block := range blks {
Hannah Howard's avatar
Hannah Howard committed
146
		fal.responseOn(requestID, cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: block.RawData(), Local: false, Err: nil})
147 148 149
	}
}

150 151 152 153 154 155
func readNNetworkRequests(ctx context.Context,
	t *testing.T,
	requestRecordChan <-chan requestRecord,
	count int) []requestRecord {
	requestRecords := make([]requestRecord, 0, count)
	for i := 0; i < count; i++ {
Hannah Howard's avatar
Hannah Howard committed
156 157 158
		var rr requestRecord
		testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
		requestRecords = append(requestRecords, rr)
159 160 161 162
	}
	return requestRecords
}

163 164 165 166 167 168 169 170 171 172 173
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
	md := make(metadata.Metadata, 0, len(blks))
	for _, block := range blks {
		md = append(md, metadata.Item{
			Link:         cidlink.Link{Cid: block.Cid()},
			BlockPresent: present,
		})
	}
	return md
}

174
func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData {
175
	md := metadataForBlocks(blks, present)
176
	metadataEncoded, err := metadata.EncodeMetadata(md)
Hannah Howard's avatar
Hannah Howard committed
177
	require.NoError(t, err, "did not encode metadata")
178 179
	return graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
180 181
		Data: metadataEncoded,
	}
182 183
}

184 185
func TestNormalSimultaneousFetch(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
186
	td := newTestData(ctx, t)
187 188 189

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
190
	peers := testutil.GeneratePeers(1)
191

Hannah Howard's avatar
Hannah Howard committed
192
	blockChain2 := testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
193

Hannah Howard's avatar
Hannah Howard committed
194 195
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
196

Hannah Howard's avatar
Hannah Howard committed
197
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
198

Hannah Howard's avatar
Hannah Howard committed
199 200 201 202
	require.Equal(t, peers[0], requestRecords[0].p)
	require.Equal(t, peers[0], requestRecords[1].p)
	require.False(t, requestRecords[0].gsr.IsCancel())
	require.False(t, requestRecords[1].gsr.IsCancel())
203 204
	require.Equal(t, defaultPriority, requestRecords[0].gsr.Priority())
	require.Equal(t, defaultPriority, requestRecords[1].gsr.Priority())
205

Hannah Howard's avatar
Hannah Howard committed
206
	require.Equal(t, td.blockChain.Selector(), requestRecords[0].gsr.Selector(), "did not encode selector properly")
Hannah Howard's avatar
Hannah Howard committed
207
	require.Equal(t, blockChain2.Selector(), requestRecords[1].gsr.Selector(), "did not encode selector properly")
208

Hannah Howard's avatar
Hannah Howard committed
209 210
	firstBlocks := append(td.blockChain.AllBlocks(), blockChain2.Blocks(0, 3)...)
	firstMetadata1 := metadataForBlocks(td.blockChain.AllBlocks(), true)
211
	firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
Hannah Howard's avatar
Hannah Howard committed
212
	require.NoError(t, err, "did not encode metadata")
213
	firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
214
	firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
Hannah Howard's avatar
Hannah Howard committed
215
	require.NoError(t, err, "did not encode metadata")
216
	firstResponses := []gsmsg.GraphSyncResponse{
217 218
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
219 220
			Data: firstMetadataEncoded1,
		}),
221 222
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
223 224
			Data: firstMetadataEncoded2,
		}),
225 226
	}

Hannah Howard's avatar
Hannah Howard committed
227 228 229
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	td.fal.verifyLastProcessedBlocks(ctx, t, firstBlocks)
	td.fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
230 231 232
		requestRecords[0].gsr.ID(): firstMetadata1,
		requestRecords[1].gsr.ID(): firstMetadata2,
	})
Hannah Howard's avatar
Hannah Howard committed
233 234
	td.fal.successResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.successResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
235

Hannah Howard's avatar
Hannah Howard committed
236
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan1)
237
	blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
238

239
	moreBlocks := blockChain2.RemainderBlocks(3)
240
	moreMetadata := metadataForBlocks(moreBlocks, true)
241
	moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
Hannah Howard's avatar
Hannah Howard committed
242
	require.NoError(t, err, "did not encode metadata")
243
	moreResponses := []gsmsg.GraphSyncResponse{
244 245
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{
			Name: graphsync.ExtensionMetadata,
246 247
			Data: moreMetadataEncoded,
		}),
248 249
	}

Hannah Howard's avatar
Hannah Howard committed
250 251 252
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	td.fal.verifyLastProcessedBlocks(ctx, t, moreBlocks)
	td.fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
253 254
		requestRecords[1].gsr.ID(): moreMetadata,
	})
255

Hannah Howard's avatar
Hannah Howard committed
256
	td.fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
257

258
	blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
259 260
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
261 262 263 264
}

func TestCancelRequestInProgress(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
265
	td := newTestData(ctx, t)
266 267 268 269 270
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	requestCtx1, cancel1 := context.WithCancel(requestCtx)
	requestCtx2, cancel2 := context.WithCancel(requestCtx)
	defer cancel2()
271
	peers := testutil.GeneratePeers(1)
272

Hannah Howard's avatar
Hannah Howard committed
273 274
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
275

Hannah Howard's avatar
Hannah Howard committed
276
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
277

Hannah Howard's avatar
Hannah Howard committed
278
	firstBlocks := td.blockChain.Blocks(0, 3)
279
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
280
	firstResponses := []gsmsg.GraphSyncResponse{
281 282
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
283 284
	}

Hannah Howard's avatar
Hannah Howard committed
285
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
286

Hannah Howard's avatar
Hannah Howard committed
287 288 289
	td.fal.successResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
	td.fal.successResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
	td.blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
290
	cancel1()
Hannah Howard's avatar
Hannah Howard committed
291
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
Hannah Howard's avatar
Hannah Howard committed
292 293 294

	require.True(t, rr.gsr.IsCancel())
	require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
295

Hannah Howard's avatar
Hannah Howard committed
296
	moreBlocks := td.blockChain.RemainderBlocks(3)
297
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
298
	moreResponses := []gsmsg.GraphSyncResponse{
299 300
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
301
	}
Hannah Howard's avatar
Hannah Howard committed
302 303 304
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	td.fal.successResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
	td.fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
305

306
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
Hannah Howard's avatar
Hannah Howard committed
307
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
308 309
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
310 311 312 313 314
}

func TestCancelManagerExitsGracefully(t *testing.T) {
	ctx := context.Background()
	managerCtx, managerCancel := context.WithCancel(ctx)
Hannah Howard's avatar
Hannah Howard committed
315
	td := newTestData(managerCtx, t)
316 317
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
318
	peers := testutil.GeneratePeers(1)
319

Hannah Howard's avatar
Hannah Howard committed
320
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
321

Hannah Howard's avatar
Hannah Howard committed
322
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
323

Hannah Howard's avatar
Hannah Howard committed
324
	firstBlocks := td.blockChain.Blocks(0, 3)
325
	firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
326
	firstResponses := []gsmsg.GraphSyncResponse{
327
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
328
	}
Hannah Howard's avatar
Hannah Howard committed
329 330 331
	td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
	td.fal.successResponseOn(rr.gsr.ID(), firstBlocks)
	td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
332 333
	managerCancel()

Hannah Howard's avatar
Hannah Howard committed
334
	moreBlocks := td.blockChain.RemainderBlocks(3)
335
	moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
336
	moreResponses := []gsmsg.GraphSyncResponse{
337
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
338
	}
Hannah Howard's avatar
Hannah Howard committed
339 340
	td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
	td.fal.successResponseOn(rr.gsr.ID(), moreBlocks)
341
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
342
	testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
343 344
}

345 346
func TestFailedRequest(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
347
	td := newTestData(ctx, t)
348 349
	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
350
	peers := testutil.GeneratePeers(1)
351

Hannah Howard's avatar
Hannah Howard committed
352
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
353

Hannah Howard's avatar
Hannah Howard committed
354
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
355
	failedResponses := []gsmsg.GraphSyncResponse{
356
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
357
	}
Hannah Howard's avatar
Hannah Howard committed
358
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
359

360 361
	testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
	testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
362 363 364 365
}

func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
366
	td := newTestData(ctx, t)
367 368 369 370 371

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
372
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
373

Hannah Howard's avatar
Hannah Howard committed
374
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
375 376

	// async loaded response responds immediately
Hannah Howard's avatar
Hannah Howard committed
377
	td.fal.successResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
378

Hannah Howard's avatar
Hannah Howard committed
379
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
380 381 382

	// failure comes in later over network
	failedResponses := []gsmsg.GraphSyncResponse{
383
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound),
384 385
	}

Hannah Howard's avatar
Hannah Howard committed
386
	td.requestManager.ProcessResponses(peers[0], failedResponses, nil)
387
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
388 389 390 391 392

}

func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
393
	td := newTestData(ctx, t)
394 395 396 397 398

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
399
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
400

Hannah Howard's avatar
Hannah Howard committed
401
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
402 403

	// async loaded response responds immediately
Hannah Howard's avatar
Hannah Howard committed
404
	td.fal.successResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks())
405

Hannah Howard's avatar
Hannah Howard committed
406
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
407

Hannah Howard's avatar
Hannah Howard committed
408
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), true)
409
	firstResponses := []gsmsg.GraphSyncResponse{
410
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
411
	}
Hannah Howard's avatar
Hannah Howard committed
412
	td.requestManager.ProcessResponses(peers[0], firstResponses, td.blockChain.AllBlocks())
413

Hannah Howard's avatar
Hannah Howard committed
414
	td.fal.verifyNoRemainingData(t, rr.gsr.ID())
415
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
416 417 418 419
}

func TestRequestReturnsMissingBlocks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
420
	td := newTestData(ctx, t)
421 422 423 424 425

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

Hannah Howard's avatar
Hannah Howard committed
426
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
427

Hannah Howard's avatar
Hannah Howard committed
428
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
429

Hannah Howard's avatar
Hannah Howard committed
430
	md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), false)
431
	firstResponses := []gsmsg.GraphSyncResponse{
432
		gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
433
	}
Hannah Howard's avatar
Hannah Howard committed
434 435 436
	td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
	for _, block := range td.blockChain.AllBlocks() {
		td.fal.responseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
437
	}
438 439
	testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
	errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
Hannah Howard's avatar
Hannah Howard committed
440
	require.NotEqual(t, len(errs), 0, "did not send errors")
441
}
442 443 444

func TestEncodingExtensions(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
445
	td := newTestData(ctx, t)
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	extensionData1 := testutil.RandomBytes(100)
	extensionName1 := graphsync.ExtensionName("AppleSauce/McGee")
	extension1 := graphsync.ExtensionData{
		Name: extensionName1,
		Data: extensionData1,
	}
	extensionData2 := testutil.RandomBytes(100)
	extensionName2 := graphsync.ExtensionName("HappyLand/Happenstance")
	extension2 := graphsync.ExtensionData{
		Name: extensionName2,
		Data: extensionData2,
	}
463 464 465

	expectedError := make(chan error, 2)
	receivedExtensionData := make(chan []byte, 2)
Hannah Howard's avatar
Hannah Howard committed
466 467
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 2)
	hook := func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
468
		data, has := responseData.Extension(extensionName1)
Hannah Howard's avatar
Hannah Howard committed
469
		require.True(t, has, "did not receive extension data in response")
470
		receivedExtensionData <- data
Hannah Howard's avatar
Hannah Howard committed
471 472 473 474 475 476 477 478
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
479
	}
Hannah Howard's avatar
Hannah Howard committed
480 481
	td.responseHooks.Register(hook)
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), extension1, extension2)
482

Hannah Howard's avatar
Hannah Howard committed
483
	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
484 485 486

	gsr := rr.gsr
	returnedData1, found := gsr.Extension(extensionName1)
Hannah Howard's avatar
Hannah Howard committed
487 488
	require.True(t, found)
	require.Equal(t, extensionData1, returnedData1, "did not encode first extension correctly")
489 490

	returnedData2, found := gsr.Extension(extensionName2)
Hannah Howard's avatar
Hannah Howard committed
491 492
	require.True(t, found)
	require.Equal(t, extensionData2, returnedData2, "did not encode second extension correctly")
493

494 495
	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
496
		expectedUpdate := testutil.RandomBytes(100)
497 498 499 500 501 502 503 504 505 506 507 508 509
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: expectedData,
				},
			),
		}
		expectedError <- nil
Hannah Howard's avatar
Hannah Howard committed
510 511 512 513 514 515
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
				Name: extensionName1,
				Data: expectedUpdate,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
516
		td.requestManager.ProcessResponses(peers[0], firstResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
517 518 519
		var received []byte
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, expectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
520

Hannah Howard's avatar
Hannah Howard committed
521
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
Hannah Howard's avatar
Hannah Howard committed
522 523 524 525
		receivedUpdateData, has := rr.gsr.Extension(extensionName1)
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

526
		nextExpectedData := testutil.RandomBytes(100)
Hannah Howard's avatar
Hannah Howard committed
527 528
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
529 530 531 532 533 534 535 536 537 538 539 540 541 542

		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nil,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: nextExpectedData,
				},
			),
		}
		expectedError <- errors.New("a terrible thing happened")
Hannah Howard's avatar
Hannah Howard committed
543 544 545 546 547 548 549 550 551 552
		expectedUpdateChan <- []graphsync.ExtensionData{
			{
				Name: extensionName1,
				Data: nextExpectedUpdate1,
			},
			{
				Name: extensionName2,
				Data: nextExpectedUpdate2,
			},
		}
Hannah Howard's avatar
Hannah Howard committed
553
		td.requestManager.ProcessResponses(peers[0], secondResponses, nil)
Hannah Howard's avatar
Hannah Howard committed
554 555
		testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
		require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne")
Hannah Howard's avatar
Hannah Howard committed
556

Hannah Howard's avatar
Hannah Howard committed
557
		rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
Hannah Howard's avatar
Hannah Howard committed
558 559 560 561 562 563 564
		receivedUpdateData, has = rr.gsr.Extension(extensionName1)
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
		receivedUpdateData, has = rr.gsr.Extension(extensionName2)
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

565 566 567
		testutil.VerifySingleTerminalError(requestCtx, t, returnedErrorChan)
		testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
	})
568
}
569

Hannah Howard's avatar
Hannah Howard committed
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
func TestBlockHooks(t *testing.T) {
	ctx := context.Background()
	td := newTestData(ctx, t)

	requestCtx, cancel := context.WithCancel(ctx)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	extensionData1 := testutil.RandomBytes(100)
	extensionName1 := graphsync.ExtensionName("AppleSauce/McGee")
	extension1 := graphsync.ExtensionData{
		Name: extensionName1,
		Data: extensionData1,
	}
	extensionData2 := testutil.RandomBytes(100)
	extensionName2 := graphsync.ExtensionName("HappyLand/Happenstance")
	extension2 := graphsync.ExtensionData{
		Name: extensionName2,
		Data: extensionData2,
	}

	receivedBlocks := make(chan graphsync.BlockData, 4)
	receivedResponses := make(chan graphsync.ResponseData, 4)
	expectedError := make(chan error, 4)
	expectedUpdateChan := make(chan []graphsync.ExtensionData, 4)
	hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
		receivedBlocks <- blockData
		receivedResponses <- responseData
		err := <-expectedError
		if err != nil {
			hookActions.TerminateWithError(err)
		}
		update := <-expectedUpdateChan
		if len(update) > 0 {
			hookActions.UpdateRequestWithExtensions(update...)
		}
	}
	td.blockHooks.Register(hook)
	returnedResponseChan, returnedErrorChan := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), extension1, extension2)

	rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]

	gsr := rr.gsr
	returnedData1, found := gsr.Extension(extensionName1)
	require.True(t, found)
	require.Equal(t, extensionData1, returnedData1, "did not encode first extension correctly")

	returnedData2, found := gsr.Extension(extensionName2)
	require.True(t, found)
	require.Equal(t, extensionData2, returnedData2, "did not encode second extension correctly")

	t.Run("responding to extensions", func(t *testing.T) {
		expectedData := testutil.RandomBytes(100)
		expectedUpdate := testutil.RandomBytes(100)

		firstBlocks := td.blockChain.Blocks(0, 3)
		firstMetadata := metadataForBlocks(firstBlocks, true)
		firstMetadataEncoded, err := metadata.EncodeMetadata(firstMetadata)
		require.NoError(t, err, "did not encode metadata")
		firstResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.PartialResponse, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: firstMetadataEncoded,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: expectedData,
				},
			),
		}
		for i := range firstBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(firstBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
						Name: extensionName1,
						Data: expectedUpdate,
					},
				}
			}
			expectedUpdateChan <- update
		}

		td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
		td.fal.verifyLastProcessedBlocks(ctx, t, firstBlocks)
		td.fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
			rr.gsr.ID(): firstMetadata,
		})
		td.fal.successResponseOn(rr.gsr.ID(), firstBlocks)

		ur := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
		receivedUpdateData, has := ur.gsr.Extension(extensionName1)
		require.True(t, has)
		require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range firstBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, firstResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, firstResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, firstMetadataEncoded, metadata, "should receive correct metadata")
			receivedExtensionData, _ := receivedResponse.Extension(extensionName1)
			require.Equal(t, expectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		nextExpectedData := testutil.RandomBytes(100)
		nextExpectedUpdate1 := testutil.RandomBytes(100)
		nextExpectedUpdate2 := testutil.RandomBytes(100)
		nextBlocks := td.blockChain.RemainderBlocks(3)
		nextMetadata := metadataForBlocks(nextBlocks, true)
		nextMetadataEncoded, err := metadata.EncodeMetadata(nextMetadata)
		require.NoError(t, err)
		secondResponses := []gsmsg.GraphSyncResponse{
			gsmsg.NewResponse(gsr.ID(),
				graphsync.RequestCompletedFull, graphsync.ExtensionData{
					Name: graphsync.ExtensionMetadata,
					Data: nextMetadataEncoded,
				},
				graphsync.ExtensionData{
					Name: extensionName1,
					Data: nextExpectedData,
				},
			),
		}
		for i := range nextBlocks {
			expectedError <- nil
			var update []graphsync.ExtensionData
			if i == len(nextBlocks)-1 {
				update = []graphsync.ExtensionData{
					{
						Name: extensionName1,
						Data: nextExpectedUpdate1,
					},
					{
						Name: extensionName2,
						Data: nextExpectedUpdate2,
					},
				}
			}
			expectedUpdateChan <- update
		}
		td.requestManager.ProcessResponses(peers[0], secondResponses, nextBlocks)
		td.fal.verifyLastProcessedBlocks(ctx, t, nextBlocks)
		td.fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
			rr.gsr.ID(): nextMetadata,
		})
		td.fal.successResponseOn(rr.gsr.ID(), nextBlocks)

		ur = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
		receivedUpdateData, has = ur.gsr.Extension(extensionName1)
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
		receivedUpdateData, has = ur.gsr.Extension(extensionName2)
		require.True(t, has)
		require.Equal(t, nextExpectedUpdate2, receivedUpdateData, "should have updated with correct extension")

		for _, blk := range nextBlocks {
			var receivedResponse graphsync.ResponseData
			testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data")
			require.Equal(t, secondResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID")
			require.Equal(t, secondResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status")
			metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata)
			require.True(t, has)
			require.Equal(t, nextMetadataEncoded, metadata, "should receive correct metadata")
			receivedExtensionData, _ := receivedResponse.Extension(extensionName1)
			require.Equal(t, nextExpectedData, receivedExtensionData, "should receive correct response extension data")
			var receivedBlock graphsync.BlockData
			testutil.AssertReceive(ctx, t, receivedBlocks, &receivedBlock, "did not receive block data")
			require.Equal(t, blk.Cid(), receivedBlock.Link().(cidlink.Link).Cid)
			require.Equal(t, uint64(len(blk.RawData())), receivedBlock.BlockSize())
		}

		testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
		td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
	})
}

755 756
func TestOutgoingRequestHooks(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
757
	td := newTestData(ctx, t)
758 759 760 761 762 763 764 765 766 767 768 769 770 771

	requestCtx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(1)

	extensionName1 := graphsync.ExtensionName("blockchain")
	extension1 := graphsync.ExtensionData{
		Name: extensionName1,
		Data: nil,
	}

	hook := func(p peer.ID, r graphsync.RequestData, ha graphsync.OutgoingRequestHookActions) {
		_, has := r.Extension(extensionName1)
		if has {
Hannah Howard's avatar
Hannah Howard committed
772
			ha.UseLinkTargetNodeStyleChooser(td.blockChain.Chooser)
773 774 775
			ha.UsePersistenceOption("chainstore")
		}
	}
Hannah Howard's avatar
Hannah Howard committed
776
	td.requestHooks.Register(hook)
777

Hannah Howard's avatar
Hannah Howard committed
778 779
	returnedResponseChan1, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), extension1)
	returnedResponseChan2, returnedErrorChan2 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
780

Hannah Howard's avatar
Hannah Howard committed
781
	requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
782

Hannah Howard's avatar
Hannah Howard committed
783
	md := metadataForBlocks(td.blockChain.AllBlocks(), true)
784 785 786 787 788 789 790 791 792 793
	mdEncoded, err := metadata.EncodeMetadata(md)
	require.NoError(t, err)
	mdExt := graphsync.ExtensionData{
		Name: graphsync.ExtensionMetadata,
		Data: mdEncoded,
	}
	responses := []gsmsg.GraphSyncResponse{
		gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
		gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, mdExt),
	}
Hannah Howard's avatar
Hannah Howard committed
794 795 796
	td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks())
	td.fal.verifyLastProcessedBlocks(ctx, t, td.blockChain.AllBlocks())
	td.fal.verifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{
797 798 799
		requestRecords[0].gsr.ID(): md,
		requestRecords[1].gsr.ID(): md,
	})
Hannah Howard's avatar
Hannah Howard committed
800 801
	td.fal.successResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks())
	td.fal.successResponseOn(requestRecords[1].gsr.ID(), td.blockChain.AllBlocks())
802

Hannah Howard's avatar
Hannah Howard committed
803 804
	td.blockChain.VerifyWholeChainWithTypes(requestCtx, returnedResponseChan1)
	td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
805 806
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan1)
	testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan2)
Hannah Howard's avatar
Hannah Howard committed
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839
	td.fal.verifyStoreUsed(t, requestRecords[0].gsr.ID(), "chainstore")
	td.fal.verifyStoreUsed(t, requestRecords[1].gsr.ID(), "")
}

type testData struct {
	requestRecordChan chan requestRecord
	fph               *fakePeerHandler
	fal               *fakeAsyncLoader
	requestHooks      *hooks.OutgoingRequestHooks
	responseHooks     *hooks.IncomingResponseHooks
	blockHooks        *hooks.IncomingBlockHooks
	requestManager    *RequestManager
	blockStore        map[ipld.Link][]byte
	loader            ipld.Loader
	storer            ipld.Storer
	blockChain        *testutil.TestBlockChain
}

func newTestData(ctx context.Context, t *testing.T) *testData {
	td := &testData{}
	td.requestRecordChan = make(chan requestRecord, 3)
	td.fph = &fakePeerHandler{td.requestRecordChan}
	td.fal = newFakeAsyncLoader()
	td.requestHooks = hooks.NewRequestHooks()
	td.responseHooks = hooks.NewResponseHooks()
	td.blockHooks = hooks.NewBlockHooks()
	td.requestManager = New(ctx, td.fal, td.requestHooks, td.responseHooks, td.blockHooks)
	td.requestManager.SetDelegate(td.fph)
	td.requestManager.Startup()
	td.blockStore = make(map[ipld.Link][]byte)
	td.loader, td.storer = testutil.NewTestStore(td.blockStore)
	td.blockChain = testutil.SetupBlockChain(ctx, t, td.loader, td.storer, 100, 5)
	return td
840
}