peermanager_test.go 4.38 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
package peermanager

import (
	"context"
	"math/rand"
	"reflect"
	"testing"
	"time"

	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testutil"
	"github.com/libp2p/go-libp2p-peer"
)

type messageSent struct {
	p       peer.ID
	message gsmsg.GraphSyncMessage
}

type fakePeer struct {
	p            peer.ID
	messagesSent chan messageSent
}

func (fp *fakePeer) Startup()  {}
func (fp *fakePeer) Shutdown() {}

func (fp *fakePeer) AddRequest(id gsmsg.GraphSyncRequestID,
	selector []byte,
	priority gsmsg.GraphSyncPriority) {
	message := gsmsg.New()
	message.AddRequest(id, selector, priority)
	fp.messagesSent <- messageSent{fp.p, message}
}

func (fp *fakePeer) Cancel(id gsmsg.GraphSyncRequestID) {
	message := gsmsg.New()
	message.Cancel(id)
	fp.messagesSent <- messageSent{fp.p, message}
}

func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory {
	return func(ctx context.Context, p peer.ID) PeerQueue {
		return &fakePeer{
			p:            p,
			messagesSent: messagesSent,
		}
	}
}

func TestAddingAndRemovingPeers(t *testing.T) {
	ctx := context.Background()
	peerQueueFactory := makePeerQueueFactory(nil)

	tp := testutil.GeneratePeers(5)
	peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
	peerManager := New(ctx, peerQueueFactory)

	peerManager.Connected(peer1)
	peerManager.Connected(peer2)
	peerManager.Connected(peer3)

	connectedPeers := peerManager.ConnectedPeers()

	if !testutil.ContainsPeer(connectedPeers, peer1) ||
		!testutil.ContainsPeer(connectedPeers, peer2) ||
		!testutil.ContainsPeer(connectedPeers, peer3) {
		t.Fatal("Peers not connected that should be connected")
	}

	if testutil.ContainsPeer(connectedPeers, peer4) ||
		testutil.ContainsPeer(connectedPeers, peer5) {
		t.Fatal("Peers connected that shouldn't be connected")
	}

	// removing a peer with only one reference
	peerManager.Disconnected(peer1)
	connectedPeers = peerManager.ConnectedPeers()

	if testutil.ContainsPeer(connectedPeers, peer1) {
		t.Fatal("Peer should have been disconnected but was not")
	}

	// connecting a peer twice, then disconnecting once, should stay in queue
	peerManager.Connected(peer2)
	peerManager.Disconnected(peer2)
	connectedPeers = peerManager.ConnectedPeers()

	if !testutil.ContainsPeer(connectedPeers, peer2) {
		t.Fatal("Peer was disconnected but should not have been")
	}
}

func TestSendingMessagesToPeers(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
	defer cancel()
	messagesSent := make(chan messageSent, 5)
	peerQueueFactory := makePeerQueueFactory(messagesSent)

	tp := testutil.GeneratePeers(5)

	id := gsmsg.GraphSyncRequestID(rand.Int31())
	priority := gsmsg.GraphSyncPriority(rand.Int31())
	selector := testutil.RandomBytes(100)

	peerManager := New(ctx, peerQueueFactory)

	peerManager.SendRequest(tp[0], id, selector, priority)
	peerManager.SendRequest(tp[1], id, selector, priority)
	peerManager.CancelRequest(tp[0], id)

	select {
	case <-ctx.Done():
		t.Fatal("did not send first message")
	case firstMessage := <-messagesSent:
		if firstMessage.p != tp[0] {
			t.Fatal("First message sent to wrong peer")
		}
		request := firstMessage.message.Requests()[0]
		if request.ID() != id ||
			request.IsCancel() != false ||
			request.Priority() != priority ||
			!reflect.DeepEqual(request.Selector(), selector) {
			t.Fatal("did not send correct first message")
		}
	}
	select {
	case <-ctx.Done():
		t.Fatal("did not send second message")
	case secondMessage := <-messagesSent:
		if secondMessage.p != tp[1] {
			t.Fatal("Second message sent to wrong peer")
		}
		request := secondMessage.message.Requests()[0]
		if request.ID() != id ||
			request.IsCancel() != false ||
			request.Priority() != priority ||
			!reflect.DeepEqual(request.Selector(), selector) {
			t.Fatal("did not send correct second message")
		}
	}
	select {
	case <-ctx.Done():
		t.Fatal("did not send third message")
	case thirdMessage := <-messagesSent:
		if thirdMessage.p != tp[0] {
			t.Fatal("Third message sent to wrong peer")
		}
		request := thirdMessage.message.Requests()[0]
		if request.ID() != id ||
			request.IsCancel() != true {
			t.Fatal("third message was not a cancel")
		}
	}
	connectedPeers := peerManager.ConnectedPeers()
	if len(connectedPeers) != 2 ||
		!testutil.ContainsPeer(connectedPeers, tp[0]) ||
		!testutil.ContainsPeer(connectedPeers, tp[1]) {
		t.Fatal("did not connect all peers that were sent messages")
	}
}