peermanager_test.go 8.58 KB
Newer Older
1 2 3 4 5
package peermanager

import (
	"context"
	"testing"
6
	"time"
7

8
	"github.com/ipfs/go-bitswap/internal/testutil"
dirkmc's avatar
dirkmc committed
9
	cid "github.com/ipfs/go-cid"
10

Raúl Kripalani's avatar
Raúl Kripalani committed
11
	"github.com/libp2p/go-libp2p-core/peer"
12 13
)

dirkmc's avatar
dirkmc committed
14 15 16 17 18
type msg struct {
	p          peer.ID
	wantBlocks []cid.Cid
	wantHaves  []cid.Cid
	cancels    []cid.Cid
19 20
}

dirkmc's avatar
dirkmc committed
21 22 23
type mockPeerQueue struct {
	p    peer.ID
	msgs chan msg
24 25
}

dirkmc's avatar
dirkmc committed
26 27
func (fp *mockPeerQueue) Startup()  {}
func (fp *mockPeerQueue) Shutdown() {}
hannahhoward's avatar
hannahhoward committed
28

dirkmc's avatar
dirkmc committed
29 30
func (fp *mockPeerQueue) AddBroadcastWantHaves(whs []cid.Cid) {
	fp.msgs <- msg{fp.p, nil, whs, nil}
31
}
dirkmc's avatar
dirkmc committed
32 33 34 35 36
func (fp *mockPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {
	fp.msgs <- msg{fp.p, wbs, whs, nil}
}
func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) {
	fp.msgs <- msg{fp.p, nil, nil, cs}
37 38
}

dirkmc's avatar
dirkmc committed
39 40 41 42 43 44 45 46
type peerWants struct {
	wantHaves  []cid.Cid
	wantBlocks []cid.Cid
	cancels    []cid.Cid
}

func collectMessages(ch chan msg, timeout time.Duration) map[peer.ID]peerWants {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
47
	defer cancel()
dirkmc's avatar
dirkmc committed
48 49

	collected := make(map[peer.ID]peerWants)
50 51
	for {
		select {
dirkmc's avatar
dirkmc committed
52 53 54 55
		case m := <-ch:
			pw, ok := collected[m.p]
			if !ok {
				pw = peerWants{}
56
			}
dirkmc's avatar
dirkmc committed
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
			pw.wantHaves = append(pw.wantHaves, m.wantHaves...)
			pw.wantBlocks = append(pw.wantBlocks, m.wantBlocks...)
			pw.cancels = append(pw.cancels, m.cancels...)
			collected[m.p] = pw
		case <-ctx.Done():
			return collected
		}
	}
}

func makePeerQueueFactory(msgs chan msg) PeerQueueFactory {
	return func(ctx context.Context, p peer.ID) PeerQueue {
		return &mockPeerQueue{
			p:    p,
			msgs: msgs,
72 73 74 75
		}
	}
}

76 77
func TestAddingAndRemovingPeers(t *testing.T) {
	ctx := context.Background()
dirkmc's avatar
dirkmc committed
78 79
	msgs := make(chan msg, 16)
	peerQueueFactory := makePeerQueueFactory(msgs)
80

dirkmc's avatar
dirkmc committed
81 82 83
	tp := testutil.GeneratePeers(6)
	self, peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4], tp[5]
	peerManager := New(ctx, peerQueueFactory, self)
84 85 86 87 88 89 90

	peerManager.Connected(peer1, nil)
	peerManager.Connected(peer2, nil)
	peerManager.Connected(peer3, nil)

	connectedPeers := peerManager.ConnectedPeers()

91 92 93
	if !testutil.ContainsPeer(connectedPeers, peer1) ||
		!testutil.ContainsPeer(connectedPeers, peer2) ||
		!testutil.ContainsPeer(connectedPeers, peer3) {
94 95 96
		t.Fatal("Peers not connected that should be connected")
	}

97 98
	if testutil.ContainsPeer(connectedPeers, peer4) ||
		testutil.ContainsPeer(connectedPeers, peer5) {
99 100 101 102 103 104 105
		t.Fatal("Peers connected that shouldn't be connected")
	}

	// removing a peer with only one reference
	peerManager.Disconnected(peer1)
	connectedPeers = peerManager.ConnectedPeers()

106
	if testutil.ContainsPeer(connectedPeers, peer1) {
107 108 109 110 111 112 113 114
		t.Fatal("Peer should have been disconnected but was not")
	}

	// connecting a peer twice, then disconnecting once, should stay in queue
	peerManager.Connected(peer2, nil)
	peerManager.Disconnected(peer2)
	connectedPeers = peerManager.ConnectedPeers()

115
	if !testutil.ContainsPeer(connectedPeers, peer2) {
116 117 118
		t.Fatal("Peer was disconnected but should not have been")
	}
}
119

dirkmc's avatar
dirkmc committed
120 121 122 123 124 125 126 127
func TestBroadcastOnConnect(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	msgs := make(chan msg, 16)
	peerQueueFactory := makePeerQueueFactory(msgs)
	tp := testutil.GeneratePeers(2)
	self, peer1 := tp[0], tp[1]
	peerManager := New(ctx, peerQueueFactory, self)
128

dirkmc's avatar
dirkmc committed
129
	cids := testutil.GenerateCids(2)
130

dirkmc's avatar
dirkmc committed
131 132 133
	// Connect with two broadcast wants for first peer
	peerManager.Connected(peer1, cids)
	collected := collectMessages(msgs, 2*time.Millisecond)
134

dirkmc's avatar
dirkmc committed
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
	if len(collected[peer1].wantHaves) != 2 {
		t.Fatal("Expected want-haves to be sent to newly connected peer")
	}
}

func TestBroadcastWantHaves(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	msgs := make(chan msg, 16)
	peerQueueFactory := makePeerQueueFactory(msgs)
	tp := testutil.GeneratePeers(3)
	self, peer1, peer2 := tp[0], tp[1], tp[2]
	peerManager := New(ctx, peerQueueFactory, self)

	cids := testutil.GenerateCids(3)

	// Connect to first peer with two broadcast wants
	peerManager.Connected(peer1, []cid.Cid{cids[0], cids[1]})
	collected := collectMessages(msgs, 2*time.Millisecond)

	if len(collected[peer1].wantHaves) != 2 {
		t.Fatal("Expected want-haves to be sent to newly connected peer")
	}

	// Connect to second peer
160 161
	peerManager.Connected(peer2, nil)

dirkmc's avatar
dirkmc committed
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
	// Send a broadcast to all peers, including cid that was already sent to
	// first peer
	peerManager.BroadcastWantHaves(ctx, []cid.Cid{cids[0], cids[2]})
	collected = collectMessages(msgs, 2*time.Millisecond)

	// One of the want-haves was already sent to peer1
	if len(collected[peer1].wantHaves) != 1 {
		t.Fatal("Expected 1 want-haves to be sent to first peer", collected[peer1].wantHaves)
	}
	if len(collected[peer2].wantHaves) != 2 {
		t.Fatal("Expected 2 want-haves to be sent to second peer")
	}
}

func TestSendWants(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	msgs := make(chan msg, 16)
	peerQueueFactory := makePeerQueueFactory(msgs)
	tp := testutil.GeneratePeers(2)
	self, peer1 := tp[0], tp[1]
	peerManager := New(ctx, peerQueueFactory, self)
	cids := testutil.GenerateCids(4)
185

dirkmc's avatar
dirkmc committed
186 187 188
	peerManager.Connected(peer1, nil)
	peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0]}, []cid.Cid{cids[2]})
	collected := collectMessages(msgs, 2*time.Millisecond)
189

dirkmc's avatar
dirkmc committed
190 191
	if len(collected[peer1].wantHaves) != 1 {
		t.Fatal("Expected want-have to be sent to peer")
192
	}
dirkmc's avatar
dirkmc committed
193 194 195 196 197 198
	if len(collected[peer1].wantBlocks) != 1 {
		t.Fatal("Expected want-block to be sent to peer")
	}

	peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2], cids[3]})
	collected = collectMessages(msgs, 2*time.Millisecond)
199

dirkmc's avatar
dirkmc committed
200 201 202 203
	// First want-have and want-block should be filtered (because they were
	// already sent)
	if len(collected[peer1].wantHaves) != 1 {
		t.Fatal("Expected want-have to be sent to peer")
204
	}
dirkmc's avatar
dirkmc committed
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
	if len(collected[peer1].wantBlocks) != 1 {
		t.Fatal("Expected want-block to be sent to peer")
	}
}

func TestSendCancels(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	msgs := make(chan msg, 16)
	peerQueueFactory := makePeerQueueFactory(msgs)
	tp := testutil.GeneratePeers(3)
	self, peer1, peer2 := tp[0], tp[1], tp[2]
	peerManager := New(ctx, peerQueueFactory, self)
	cids := testutil.GenerateCids(4)

	// Connect to peer1 and peer2
	peerManager.Connected(peer1, nil)
	peerManager.Connected(peer2, nil)

	// Send 2 want-blocks and 1 want-have to peer1
	peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]})

	// Clear messages
	collectMessages(msgs, 2*time.Millisecond)

	// Send cancels for 1 want-block and 1 want-have
	peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]})
	collected := collectMessages(msgs, 2*time.Millisecond)
233

dirkmc's avatar
dirkmc committed
234 235
	if _, ok := collected[peer2]; ok {
		t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
236
	}
dirkmc's avatar
dirkmc committed
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
	if len(collected[peer1].cancels) != 2 {
		t.Fatal("Expected cancel to be sent for want-block and want-have sent to peer")
	}

	// Send cancels for all cids
	peerManager.SendCancels(ctx, cids)
	collected = collectMessages(msgs, 2*time.Millisecond)

	if _, ok := collected[peer2]; ok {
		t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
	}
	if len(collected[peer1].cancels) != 1 {
		t.Fatal("Expected cancel to be sent for remaining want-block")
	}
}

func (s *sess) ID() uint64 {
	return s.id
}
func (s *sess) SignalAvailability(p peer.ID, isAvailable bool) {
	s.available[p] = isAvailable
}
259

dirkmc's avatar
dirkmc committed
260 261 262 263
type sess struct {
	id        uint64
	available map[peer.ID]bool
}
264

dirkmc's avatar
dirkmc committed
265 266 267 268 269 270 271 272 273 274
func newSess(id uint64) *sess {
	return &sess{id, make(map[peer.ID]bool)}
}

func TestSessionRegistration(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	msgs := make(chan msg, 16)
	peerQueueFactory := makePeerQueueFactory(msgs)

275 276
	tp := testutil.GeneratePeers(3)
	self, p1, p2 := tp[0], tp[1], tp[2]
dirkmc's avatar
dirkmc committed
277 278 279 280 281 282 283
	peerManager := New(ctx, peerQueueFactory, self)

	id := uint64(1)
	s := newSess(id)
	peerManager.RegisterSession(p1, s)
	if s.available[p1] {
		t.Fatal("Expected peer not be available till connected")
284
	}
285 286 287 288
	peerManager.RegisterSession(p2, s)
	if s.available[p2] {
		t.Fatal("Expected peer not be available till connected")
	}
289

dirkmc's avatar
dirkmc committed
290 291 292
	peerManager.Connected(p1, nil)
	if !s.available[p1] {
		t.Fatal("Expected signal callback")
293
	}
294 295 296 297
	peerManager.Connected(p2, nil)
	if !s.available[p2] {
		t.Fatal("Expected signal callback")
	}
298

dirkmc's avatar
dirkmc committed
299 300 301
	peerManager.Disconnected(p1)
	if s.available[p1] {
		t.Fatal("Expected signal callback")
302
	}
303 304 305
	if !s.available[p2] {
		t.Fatal("Expected signal callback only for disconnected peer")
	}
306

dirkmc's avatar
dirkmc committed
307 308 309 310 311
	peerManager.UnregisterSession(id)

	peerManager.Connected(p1, nil)
	if s.available[p1] {
		t.Fatal("Expected no signal callback (session unregistered)")
312 313
	}
}