loader_test.go 3.53 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
package loader

import (
	"context"
	"errors"
	"io"
	"io/ioutil"
	"math/rand"
	"reflect"
	"testing"
	"time"

	"github.com/ipfs/go-graphsync/ipldbridge"
14
	"github.com/ipfs/go-graphsync/requestmanager/types"
15 16 17 18 19 20 21 22 23 24 25 26 27

	"github.com/ipfs/go-graphsync/testbridge"
	"github.com/ipfs/go-graphsync/testutil"
	"github.com/ipld/go-ipld-prime"

	gsmsg "github.com/ipfs/go-graphsync/message"
)

type callParams struct {
	requestID gsmsg.GraphSyncRequestID
	link      ipld.Link
}

28 29
func makeAsyncLoadFn(responseChan chan types.AsyncLoadResult, calls chan callParams) AsyncLoadFn {
	return func(requestID gsmsg.GraphSyncRequestID, link ipld.Link) <-chan types.AsyncLoadResult {
30 31 32 33 34 35 36 37 38
		calls <- callParams{requestID, link}
		return responseChan
	}
}

func TestWrappedAsyncLoaderReturnsValues(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer cancel()
39
	responseChan := make(chan types.AsyncLoadResult, 1)
40 41 42 43 44 45 46 47
	calls := make(chan callParams, 1)
	asyncLoadFn := makeAsyncLoadFn(responseChan, calls)
	errChan := make(chan error)
	requestID := gsmsg.GraphSyncRequestID(rand.Int31())
	loader := WrapAsyncLoader(ctx, asyncLoadFn, requestID, errChan)

	link := testbridge.NewMockLink()
	data := testutil.RandomBytes(100)
48
	responseChan <- types.AsyncLoadResult{Data: data, Err: nil}
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
	stream, err := loader(link, ipldbridge.LinkContext{})
	if err != nil {
		t.Fatal("Should not have errored on load")
	}
	returnedData, err := ioutil.ReadAll(stream)
	if err != nil {
		t.Fatal("error in return stream")
	}
	if !reflect.DeepEqual(data, returnedData) {
		t.Fatal("returned data did not match expected")
	}
}

func TestWrappedAsyncLoaderSideChannelsErrors(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer cancel()
66
	responseChan := make(chan types.AsyncLoadResult, 1)
67 68 69 70 71 72 73 74
	calls := make(chan callParams, 1)
	asyncLoadFn := makeAsyncLoadFn(responseChan, calls)
	errChan := make(chan error, 1)
	requestID := gsmsg.GraphSyncRequestID(rand.Int31())
	loader := WrapAsyncLoader(ctx, asyncLoadFn, requestID, errChan)

	link := testbridge.NewMockLink()
	err := errors.New("something went wrong")
75
	responseChan <- types.AsyncLoadResult{Data: nil, Err: err}
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
	stream, loadErr := loader(link, ipldbridge.LinkContext{})
	if stream != nil || loadErr != ipldbridge.ErrDoNotFollow() {
		t.Fatal("Should have errored on load")
	}
	select {
	case <-ctx.Done():
		t.Fatal("should have returned an error on side channel but didn't")
	case returnedErr := <-errChan:
		if returnedErr != err {
			t.Fatal("returned wrong error on side channel")
		}
	}
}

func TestWrappedAsyncLoaderContextCancels(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer cancel()
	subCtx, subCancel := context.WithCancel(ctx)
95
	responseChan := make(chan types.AsyncLoadResult, 1)
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
	calls := make(chan callParams, 1)
	asyncLoadFn := makeAsyncLoadFn(responseChan, calls)
	errChan := make(chan error, 1)
	requestID := gsmsg.GraphSyncRequestID(rand.Int31())
	loader := WrapAsyncLoader(subCtx, asyncLoadFn, requestID, errChan)
	link := testbridge.NewMockLink()
	resultsChan := make(chan struct {
		io.Reader
		error
	})
	go func() {
		stream, err := loader(link, ipldbridge.LinkContext{})
		resultsChan <- struct {
			io.Reader
			error
		}{stream, err}
	}()
	subCancel()

	select {
	case <-ctx.Done():
		t.Fatal("should have returned from context cancelling but didn't")
	case result := <-resultsChan:
		if result.Reader != nil || result.error == nil {
			t.Fatal("should have errored from context cancelling but didn't")
		}
	}
}