responsecollector_test.go 2.21 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"fmt"
6 7 8
	"testing"
	"time"

9
	ipld "github.com/ipld/go-ipld-prime"
10
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Hannah Howard's avatar
Hannah Howard committed
11
	"github.com/stretchr/testify/require"
Hannah Howard's avatar
Hannah Howard committed
12 13 14

	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/testutil"
15 16 17 18 19 20 21 22 23
)

func TestBufferingResponseProgress(t *testing.T) {
	backgroundCtx := context.Background()
	ctx, cancel := context.WithTimeout(backgroundCtx, time.Second)
	defer cancel()
	rc := newResponseCollector(ctx)
	requestCtx, requestCancel := context.WithCancel(backgroundCtx)
	defer requestCancel()
24
	incomingResponses := make(chan graphsync.ResponseProgress)
25
	incomingErrors := make(chan error)
26 27
	cancelRequest := func() {}

28 29
	outgoingResponses, outgoingErrors := rc.collectResponses(
		requestCtx, incomingResponses, incomingErrors, cancelRequest)
30

31
	blockStore := make(map[ipld.Link][]byte)
32
	loader, storer := testutil.NewTestStore(blockStore)
33 34
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 10)
	blocks := blockChain.AllBlocks()
35

36
	for i, block := range blocks {
Hannah Howard's avatar
Hannah Howard committed
37
		testutil.AssertSends(ctx, t, incomingResponses, graphsync.ResponseProgress{
38
			Node: blockChain.NodeTipIndex(i),
39
			LastBlock: struct {
40 41
				Path ipld.Path
				Link ipld.Link
42
			}{ipld.Path{}, cidlink.Link{Cid: block.Cid()}},
Hannah Howard's avatar
Hannah Howard committed
43
		}, "did not write block to channel")
44 45
	}

46 47
	interimError := fmt.Errorf("A block was missing")
	terminalError := fmt.Errorf("Something terrible happened")
Hannah Howard's avatar
Hannah Howard committed
48 49
	testutil.AssertSends(ctx, t, incomingErrors, interimError, "did not write error to channel")
	testutil.AssertSends(ctx, t, incomingErrors, terminalError, "did not write error to channel")
50

51
	for _, block := range blocks {
Hannah Howard's avatar
Hannah Howard committed
52 53 54
		var testResponse graphsync.ResponseProgress
		testutil.AssertReceive(ctx, t, outgoingResponses, &testResponse, "should read from outgoing responses")
		require.Equal(t, block.Cid(), testResponse.LastBlock.Link.(cidlink.Link).Cid, "did not store block correctly")
55
	}
56 57

	for i := 0; i < 2; i++ {
Hannah Howard's avatar
Hannah Howard committed
58 59 60 61 62 63
		var testErr error
		testutil.AssertReceive(ctx, t, outgoingErrors, &testErr, "should have read from channel but couldn't")
		if i == 0 {
			require.Equal(t, interimError, testErr, "incorrect error message sent")
		} else {
			require.Equal(t, terminalError, testErr, "incorrect error message sent")
64 65
		}
	}
66
}