responsecollector_test.go 2.21 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"fmt"
6 7 8
	"testing"
	"time"

9
	"github.com/ipfs/go-graphsync"
10
	"github.com/ipfs/go-graphsync/testutil"
11
	ipld "github.com/ipld/go-ipld-prime"
12
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Hannah Howard's avatar
Hannah Howard committed
13
	"github.com/stretchr/testify/require"
14 15 16 17 18 19 20 21 22
)

func TestBufferingResponseProgress(t *testing.T) {
	backgroundCtx := context.Background()
	ctx, cancel := context.WithTimeout(backgroundCtx, time.Second)
	defer cancel()
	rc := newResponseCollector(ctx)
	requestCtx, requestCancel := context.WithCancel(backgroundCtx)
	defer requestCancel()
23
	incomingResponses := make(chan graphsync.ResponseProgress)
24
	incomingErrors := make(chan error)
25 26
	cancelRequest := func() {}

27 28
	outgoingResponses, outgoingErrors := rc.collectResponses(
		requestCtx, incomingResponses, incomingErrors, cancelRequest)
29

30
	blockStore := make(map[ipld.Link][]byte)
31
	loader, storer := testutil.NewTestStore(blockStore)
32 33
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 10)
	blocks := blockChain.AllBlocks()
34

35
	for i, block := range blocks {
Hannah Howard's avatar
Hannah Howard committed
36
		testutil.AssertSends(ctx, t, incomingResponses, graphsync.ResponseProgress{
37
			Node: blockChain.NodeTipIndex(i),
38
			LastBlock: struct {
39 40
				Path ipld.Path
				Link ipld.Link
41
			}{ipld.Path{}, cidlink.Link{Cid: block.Cid()}},
Hannah Howard's avatar
Hannah Howard committed
42
		}, "did not write block to channel")
43 44
	}

45 46
	interimError := fmt.Errorf("A block was missing")
	terminalError := fmt.Errorf("Something terrible happened")
Hannah Howard's avatar
Hannah Howard committed
47 48
	testutil.AssertSends(ctx, t, incomingErrors, interimError, "did not write error to channel")
	testutil.AssertSends(ctx, t, incomingErrors, terminalError, "did not write error to channel")
49

50
	for _, block := range blocks {
Hannah Howard's avatar
Hannah Howard committed
51 52 53
		var testResponse graphsync.ResponseProgress
		testutil.AssertReceive(ctx, t, outgoingResponses, &testResponse, "should read from outgoing responses")
		require.Equal(t, block.Cid(), testResponse.LastBlock.Link.(cidlink.Link).Cid, "did not store block correctly")
54
	}
55 56

	for i := 0; i < 2; i++ {
Hannah Howard's avatar
Hannah Howard committed
57 58 59 60 61 62
		var testErr error
		testutil.AssertReceive(ctx, t, outgoingErrors, &testErr, "should have read from channel but couldn't")
		if i == 0 {
			require.Equal(t, interimError, testErr, "incorrect error message sent")
		} else {
			require.Equal(t, terminalError, testErr, "incorrect error message sent")
63 64
		}
	}
65
}