responsecollector_test.go 2.38 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5 6
	"fmt"
	"reflect"
7 8 9
	"testing"
	"time"

10
	"github.com/ipfs/go-graphsync"
11 12
	"github.com/ipfs/go-graphsync/testbridge"
	ipld "github.com/ipld/go-ipld-prime"
13
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
14

15 16 17 18 19 20 21 22 23 24
	"github.com/ipfs/go-graphsync/testutil"
)

func TestBufferingResponseProgress(t *testing.T) {
	backgroundCtx := context.Background()
	ctx, cancel := context.WithTimeout(backgroundCtx, time.Second)
	defer cancel()
	rc := newResponseCollector(ctx)
	requestCtx, requestCancel := context.WithCancel(backgroundCtx)
	defer requestCancel()
25
	incomingResponses := make(chan graphsync.ResponseProgress)
26
	incomingErrors := make(chan error)
27 28
	cancelRequest := func() {}

29 30
	outgoingResponses, outgoingErrors := rc.collectResponses(
		requestCtx, incomingResponses, incomingErrors, cancelRequest)
31

32 33 34 35
	blockStore := make(map[ipld.Link][]byte)
	loader, storer := testbridge.NewMockStore(blockStore)
	blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 10)
	blocks := blockChain.AllBlocks()
36

37
	for i, block := range blocks {
38 39 40
		select {
		case <-ctx.Done():
			t.Fatal("should have written to channel but couldn't")
41
		case incomingResponses <- graphsync.ResponseProgress{
42
			Node: blockChain.NodeTipIndex(i),
43
			LastBlock: struct {
44 45
				Path ipld.Path
				Link ipld.Link
46 47
			}{ipld.Path{}, cidlink.Link{Cid: block.Cid()}},
		}:
48 49 50
		}
	}

51 52
	interimError := fmt.Errorf("A block was missing")
	terminalError := fmt.Errorf("Something terrible happened")
53 54 55 56 57 58 59 60 61 62 63
	select {
	case <-ctx.Done():
		t.Fatal("should have written error to channel but didn't")
	case incomingErrors <- interimError:
	}
	select {
	case <-ctx.Done():
		t.Fatal("should have written error to channel but didn't")
	case incomingErrors <- terminalError:
	}

64 65 66 67
	for _, block := range blocks {
		select {
		case <-ctx.Done():
			t.Fatal("should have read from channel but couldn't")
68 69
		case testResponse := <-outgoingResponses:
			if testResponse.LastBlock.Link.(cidlink.Link).Cid != block.Cid() {
70 71 72 73
				t.Fatal("stored blocks incorrectly")
			}
		}
	}
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90

	for i := 0; i < 2; i++ {
		select {
		case <-ctx.Done():
			t.Fatal("should have read from channel but couldn't")
		case testErr := <-outgoingErrors:
			if i == 0 {
				if !reflect.DeepEqual(testErr, interimError) {
					t.Fatal("incorrect error message sent")
				}
			} else {
				if !reflect.DeepEqual(testErr, terminalError) {
					t.Fatal("incorrect error message sent")
				}
			}
		}
	}
91
}