graphsync_test.go 5.39 KB
Newer Older
1 2 3 4 5 6
package graphsync

import (
	"context"
	"fmt"
	"io"
7 8
	"math"
	"math/rand"
9 10 11 12
	"reflect"
	"testing"
	"time"

13 14 15
	"github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"

16 17 18 19 20 21 22 23 24 25
	"github.com/ipfs/go-graphsync/ipldbridge"
	gsmsg "github.com/ipfs/go-graphsync/message"
	gsnet "github.com/ipfs/go-graphsync/network"
	"github.com/ipfs/go-graphsync/testbridge"
	"github.com/ipfs/go-graphsync/testutil"
	ipld "github.com/ipld/go-ipld-prime"
	"github.com/libp2p/go-libp2p-peer"
	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
)

26 27 28 29 30
type receivedMessage struct {
	message gsmsg.GraphSyncMessage
	sender  peer.ID
}

31 32
// Receiver is an interface for receiving messages from the GraphSyncNetwork.
type receiver struct {
33
	messageReceived chan receivedMessage
34 35 36 37 38 39
}

func (r *receiver) ReceiveMessage(
	ctx context.Context,
	sender peer.ID,
	incoming gsmsg.GraphSyncMessage) {
40

41 42
	select {
	case <-ctx.Done():
43
	case r.messageReceived <- receivedMessage{incoming, sender}:
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
	}
}

func (r *receiver) ReceiveError(err error) {
}

func TestMakeRequestToNetwork(t *testing.T) {
	// create network
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	mn := mocknet.New(ctx)

	// setup network
	host1, err := mn.GenPeer()
	if err != nil {
		t.Fatal("error generating host")
	}
	host2, err := mn.GenPeer()
	if err != nil {
		t.Fatal("error generating host")
	}
	err = mn.LinkAll()
	if err != nil {
		t.Fatal("error linking hosts")
	}

	gsnet1 := gsnet.NewFromLibp2pHost(host1)

	// setup receiving peer to just record message coming in
	gsnet2 := gsnet.NewFromLibp2pHost(host2)
	r := &receiver{
76
		messageReceived: make(chan receivedMessage),
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
	}
	gsnet2.SetDelegate(r)

	loader := func(ipldLink ipld.Link, lnkCtx ipldbridge.LinkContext) (io.Reader, error) {
		return nil, fmt.Errorf("unable to load block")
	}
	bridge := testbridge.NewMockIPLDBridge()
	graphSync := New(ctx, gsnet1, bridge, loader)

	cids := testutil.GenerateCids(5)
	spec := testbridge.NewMockSelectorSpec(cids)
	requestCtx, requestCancel := context.WithCancel(ctx)
	defer requestCancel()
	graphSync.Request(requestCtx, host2.ID(), spec)

92
	var message receivedMessage
93 94 95
	select {
	case <-ctx.Done():
		t.Fatal("did not receive message sent")
96
	case message = <-r.messageReceived:
97 98
	}

99
	sender := message.sender
100 101 102 103
	if sender != host1.ID() {
		t.Fatal("received message from wrong node")
	}

104
	received := message.message
105 106 107 108 109 110 111 112 113 114 115 116 117
	receivedRequests := received.Requests()
	if len(receivedRequests) != 1 {
		t.Fatal("Did not add request to received message")
	}
	receivedRequest := receivedRequests[0]
	receivedSpec, err := bridge.DecodeNode(receivedRequest.Selector())
	if err != nil {
		t.Fatal("unable to decode transmitted selector")
	}
	if !reflect.DeepEqual(spec, receivedSpec) {
		t.Fatal("did not transmit selector spec correctly")
	}
}
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215

func TestSendResponseToIncomingRequest(t *testing.T) {
	// create network
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
	defer cancel()
	mn := mocknet.New(ctx)

	// setup network
	host1, err := mn.GenPeer()
	if err != nil {
		t.Fatal("error generating host")
	}
	host2, err := mn.GenPeer()
	if err != nil {
		t.Fatal("error generating host")
	}
	err = mn.LinkAll()
	if err != nil {
		t.Fatal("error linking hosts")
	}

	gsnet1 := gsnet.NewFromLibp2pHost(host1)
	r := &receiver{
		messageReceived: make(chan receivedMessage),
	}
	gsnet1.SetDelegate(r)

	// setup receiving peer to just record message coming in
	gsnet2 := gsnet.NewFromLibp2pHost(host2)

	blks := testutil.GenerateBlocksOfSize(5, 100)

	loader := testbridge.NewMockLoader(blks)
	bridge := testbridge.NewMockIPLDBridge()

	// initialize graphsync on second node to response to requests
	New(ctx, gsnet2, bridge, loader)

	cids := make([]cid.Cid, 0, 7)
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	// append block that should be deduped
	cids = append(cids, blks[0].Cid())

	unknownCid := testutil.GenerateCids(1)[0]
	cids = append(cids, unknownCid)

	spec := testbridge.NewMockSelectorSpec(cids)
	selectorData, err := bridge.EncodeNode(spec)
	if err != nil {
		t.Fatal("could not encode selector spec")
	}
	requestID := gsmsg.GraphSyncRequestID(rand.Int31())

	message := gsmsg.New()
	message.AddRequest(gsmsg.NewRequest(requestID, selectorData, gsmsg.GraphSyncPriority(math.MaxInt32)))
	// send request across network
	gsnet1.SendMessage(ctx, host2.ID(), message)
	// read the values sent back to requestor
	var received gsmsg.GraphSyncMessage
	var receivedBlocks []blocks.Block
readAllMessages:
	for {
		select {
		case <-ctx.Done():
			t.Fatal("did not receive complete response")
		case message := <-r.messageReceived:
			sender := message.sender
			if sender != host2.ID() {
				t.Fatal("received message from wrong node")
			}

			received = message.message
			receivedBlocks = append(receivedBlocks, received.Blocks()...)
			receivedResponses := received.Responses()
			if len(receivedResponses) != 1 {
				t.Fatal("Did not receive response")
			}
			if receivedResponses[0].RequestID() != requestID {
				t.Fatal("Sent response for incorrect request id")
			}
			if receivedResponses[0].Status() != gsmsg.PartialResponse {
				break readAllMessages
			}
		}
	}

	if len(receivedBlocks) != len(blks) {
		t.Fatal("Send incorrect number of blocks or there were duplicate blocks")
	}

	// there should have been a missing CID
	if received.Responses()[0].Status() != gsmsg.RequestCompletedPartial {
		t.Fatal("transmitted full response when only partial was transmitted")
	}
}