peermessagemanager_test.go 2.96 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
package peermanager

import (
	"context"
	"math/rand"
	"reflect"
	"testing"
	"time"

	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testutil"
	"github.com/libp2p/go-libp2p-peer"
)

type messageSent struct {
	p       peer.ID
	message gsmsg.GraphSyncMessage
}

type fakePeer struct {
	p            peer.ID
	messagesSent chan messageSent
}

func (fp *fakePeer) Startup()  {}
func (fp *fakePeer) Shutdown() {}

func (fp *fakePeer) AddRequest(graphSyncRequest gsmsg.GraphSyncRequest) {
	message := gsmsg.New()
	message.AddRequest(graphSyncRequest)
	fp.messagesSent <- messageSent{fp.p, message}
}

func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory {
	return func(ctx context.Context, p peer.ID) PeerQueue {
		return &fakePeer{
			p:            p,
			messagesSent: messagesSent,
		}
	}
}

func TestSendingMessagesToPeers(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
	defer cancel()
	messagesSent := make(chan messageSent, 5)
	peerQueueFactory := makePeerQueueFactory(messagesSent)

	tp := testutil.GeneratePeers(5)

	id := gsmsg.GraphSyncRequestID(rand.Int31())
	priority := gsmsg.GraphSyncPriority(rand.Int31())
	selector := testutil.RandomBytes(100)

	peerManager := NewMessageManager(ctx, peerQueueFactory)

	request := gsmsg.NewRequest(id, selector, priority)
	peerManager.SendRequest(tp[0], request)
	peerManager.SendRequest(tp[1], request)
	cancelRequest := gsmsg.CancelRequest(id)
	peerManager.SendRequest(tp[0], cancelRequest)

	select {
	case <-ctx.Done():
		t.Fatal("did not send first message")
	case firstMessage := <-messagesSent:
		if firstMessage.p != tp[0] {
			t.Fatal("First message sent to wrong peer")
		}
		request := firstMessage.message.Requests()[0]
		if request.ID() != id ||
			request.IsCancel() != false ||
			request.Priority() != priority ||
			!reflect.DeepEqual(request.Selector(), selector) {
			t.Fatal("did not send correct first message")
		}
	}
	select {
	case <-ctx.Done():
		t.Fatal("did not send second message")
	case secondMessage := <-messagesSent:
		if secondMessage.p != tp[1] {
			t.Fatal("Second message sent to wrong peer")
		}
		request := secondMessage.message.Requests()[0]
		if request.ID() != id ||
			request.IsCancel() != false ||
			request.Priority() != priority ||
			!reflect.DeepEqual(request.Selector(), selector) {
			t.Fatal("did not send correct second message")
		}
	}
	select {
	case <-ctx.Done():
		t.Fatal("did not send third message")
	case thirdMessage := <-messagesSent:
		if thirdMessage.p != tp[0] {
			t.Fatal("Third message sent to wrong peer")
		}
		request := thirdMessage.message.Requests()[0]
		if request.ID() != id ||
			request.IsCancel() != true {
			t.Fatal("third message was not a cancel")
		}
	}
	connectedPeers := peerManager.ConnectedPeers()
	if len(connectedPeers) != 2 ||
		!testutil.ContainsPeer(connectedPeers, tp[0]) ||
		!testutil.ContainsPeer(connectedPeers, tp[1]) {
		t.Fatal("did not connect all peers that were sent messages")
	}
}