peermanager_test.go 10.1 KB
Newer Older
1 2 3 4
package peermanager

import (
	"context"
5
	"math/rand"
6
	"testing"
7
	"time"
8

9
	"github.com/ipfs/go-bitswap/internal/testutil"
dirkmc's avatar
dirkmc committed
10
	cid "github.com/ipfs/go-cid"
11

Raúl Kripalani's avatar
Raúl Kripalani committed
12
	"github.com/libp2p/go-libp2p-core/peer"
13 14
)

dirkmc's avatar
dirkmc committed
15 16 17 18 19
type msg struct {
	p          peer.ID
	wantBlocks []cid.Cid
	wantHaves  []cid.Cid
	cancels    []cid.Cid
20 21
}

dirkmc's avatar
dirkmc committed
22 23 24
type mockPeerQueue struct {
	p    peer.ID
	msgs chan msg
25 26
}

dirkmc's avatar
dirkmc committed
27 28
func (fp *mockPeerQueue) Startup()  {}
func (fp *mockPeerQueue) Shutdown() {}
hannahhoward's avatar
hannahhoward committed
29

dirkmc's avatar
dirkmc committed
30 31
func (fp *mockPeerQueue) AddBroadcastWantHaves(whs []cid.Cid) {
	fp.msgs <- msg{fp.p, nil, whs, nil}
32
}
dirkmc's avatar
dirkmc committed
33 34 35 36 37
func (fp *mockPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {
	fp.msgs <- msg{fp.p, wbs, whs, nil}
}
func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) {
	fp.msgs <- msg{fp.p, nil, nil, cs}
38
}
Dirk McCormick's avatar
Dirk McCormick committed
39
func (fp *mockPeerQueue) ResponseReceived(ks []cid.Cid) {
40
}
41

dirkmc's avatar
dirkmc committed
42 43 44 45 46 47 48 49
type peerWants struct {
	wantHaves  []cid.Cid
	wantBlocks []cid.Cid
	cancels    []cid.Cid
}

func collectMessages(ch chan msg, timeout time.Duration) map[peer.ID]peerWants {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
50
	defer cancel()
dirkmc's avatar
dirkmc committed
51 52

	collected := make(map[peer.ID]peerWants)
53 54
	for {
		select {
dirkmc's avatar
dirkmc committed
55 56 57 58
		case m := <-ch:
			pw, ok := collected[m.p]
			if !ok {
				pw = peerWants{}
59
			}
dirkmc's avatar
dirkmc committed
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
			pw.wantHaves = append(pw.wantHaves, m.wantHaves...)
			pw.wantBlocks = append(pw.wantBlocks, m.wantBlocks...)
			pw.cancels = append(pw.cancels, m.cancels...)
			collected[m.p] = pw
		case <-ctx.Done():
			return collected
		}
	}
}

func makePeerQueueFactory(msgs chan msg) PeerQueueFactory {
	return func(ctx context.Context, p peer.ID) PeerQueue {
		return &mockPeerQueue{
			p:    p,
			msgs: msgs,
75 76 77 78
		}
	}
}

79 80
func TestAddingAndRemovingPeers(t *testing.T) {
	ctx := context.Background()
dirkmc's avatar
dirkmc committed
81 82
	msgs := make(chan msg, 16)
	peerQueueFactory := makePeerQueueFactory(msgs)
83

dirkmc's avatar
dirkmc committed
84 85 86
	tp := testutil.GeneratePeers(6)
	self, peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4], tp[5]
	peerManager := New(ctx, peerQueueFactory, self)
87

88 89 90
	peerManager.Connected(peer1)
	peerManager.Connected(peer2)
	peerManager.Connected(peer3)
91 92 93

	connectedPeers := peerManager.ConnectedPeers()

94 95 96
	if !testutil.ContainsPeer(connectedPeers, peer1) ||
		!testutil.ContainsPeer(connectedPeers, peer2) ||
		!testutil.ContainsPeer(connectedPeers, peer3) {
97 98 99
		t.Fatal("Peers not connected that should be connected")
	}

100 101
	if testutil.ContainsPeer(connectedPeers, peer4) ||
		testutil.ContainsPeer(connectedPeers, peer5) {
102 103 104
		t.Fatal("Peers connected that shouldn't be connected")
	}

Dirk McCormick's avatar
Dirk McCormick committed
105
	// disconnect a peer
106 107 108
	peerManager.Disconnected(peer1)
	connectedPeers = peerManager.ConnectedPeers()

109
	if testutil.ContainsPeer(connectedPeers, peer1) {
110 111 112
		t.Fatal("Peer should have been disconnected but was not")
	}

Dirk McCormick's avatar
Dirk McCormick committed
113
	// reconnect peer
114
	peerManager.Connected(peer1)
115 116
	connectedPeers = peerManager.ConnectedPeers()

Dirk McCormick's avatar
Dirk McCormick committed
117 118
	if !testutil.ContainsPeer(connectedPeers, peer1) {
		t.Fatal("Peer should have been connected but was not")
119 120
	}
}
121

dirkmc's avatar
dirkmc committed
122 123 124 125 126 127 128 129
func TestBroadcastOnConnect(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	msgs := make(chan msg, 16)
	peerQueueFactory := makePeerQueueFactory(msgs)
	tp := testutil.GeneratePeers(2)
	self, peer1 := tp[0], tp[1]
	peerManager := New(ctx, peerQueueFactory, self)
130

dirkmc's avatar
dirkmc committed
131
	cids := testutil.GenerateCids(2)
132
	peerManager.BroadcastWantHaves(ctx, cids)
133

dirkmc's avatar
dirkmc committed
134
	// Connect with two broadcast wants for first peer
135
	peerManager.Connected(peer1)
dirkmc's avatar
dirkmc committed
136
	collected := collectMessages(msgs, 2*time.Millisecond)
137

dirkmc's avatar
dirkmc committed
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
	if len(collected[peer1].wantHaves) != 2 {
		t.Fatal("Expected want-haves to be sent to newly connected peer")
	}
}

func TestBroadcastWantHaves(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	msgs := make(chan msg, 16)
	peerQueueFactory := makePeerQueueFactory(msgs)
	tp := testutil.GeneratePeers(3)
	self, peer1, peer2 := tp[0], tp[1], tp[2]
	peerManager := New(ctx, peerQueueFactory, self)

	cids := testutil.GenerateCids(3)

154 155 156 157 158
	// Broadcast the first two.
	peerManager.BroadcastWantHaves(ctx, cids[:2])

	// First peer should get them.
	peerManager.Connected(peer1)
dirkmc's avatar
dirkmc committed
159 160 161 162 163 164 165
	collected := collectMessages(msgs, 2*time.Millisecond)

	if len(collected[peer1].wantHaves) != 2 {
		t.Fatal("Expected want-haves to be sent to newly connected peer")
	}

	// Connect to second peer
166
	peerManager.Connected(peer2)
167

dirkmc's avatar
dirkmc committed
168 169 170 171 172 173 174
	// Send a broadcast to all peers, including cid that was already sent to
	// first peer
	peerManager.BroadcastWantHaves(ctx, []cid.Cid{cids[0], cids[2]})
	collected = collectMessages(msgs, 2*time.Millisecond)

	// One of the want-haves was already sent to peer1
	if len(collected[peer1].wantHaves) != 1 {
175 176
		t.Fatalf("Expected 1 want-haves to be sent to first peer, got %d",
			len(collected[peer1].wantHaves))
dirkmc's avatar
dirkmc committed
177
	}
178 179 180
	if len(collected[peer2].wantHaves) != 3 {
		t.Fatalf("Expected 3 want-haves to be sent to second peer, got %d",
			len(collected[peer2].wantHaves))
dirkmc's avatar
dirkmc committed
181 182 183 184 185 186 187 188 189 190 191 192
	}
}

func TestSendWants(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	msgs := make(chan msg, 16)
	peerQueueFactory := makePeerQueueFactory(msgs)
	tp := testutil.GeneratePeers(2)
	self, peer1 := tp[0], tp[1]
	peerManager := New(ctx, peerQueueFactory, self)
	cids := testutil.GenerateCids(4)
193

194
	peerManager.Connected(peer1)
dirkmc's avatar
dirkmc committed
195 196
	peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0]}, []cid.Cid{cids[2]})
	collected := collectMessages(msgs, 2*time.Millisecond)
197

dirkmc's avatar
dirkmc committed
198 199
	if len(collected[peer1].wantHaves) != 1 {
		t.Fatal("Expected want-have to be sent to peer")
200
	}
dirkmc's avatar
dirkmc committed
201 202 203 204 205 206
	if len(collected[peer1].wantBlocks) != 1 {
		t.Fatal("Expected want-block to be sent to peer")
	}

	peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2], cids[3]})
	collected = collectMessages(msgs, 2*time.Millisecond)
207

dirkmc's avatar
dirkmc committed
208 209 210 211
	// First want-have and want-block should be filtered (because they were
	// already sent)
	if len(collected[peer1].wantHaves) != 1 {
		t.Fatal("Expected want-have to be sent to peer")
212
	}
dirkmc's avatar
dirkmc committed
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
	if len(collected[peer1].wantBlocks) != 1 {
		t.Fatal("Expected want-block to be sent to peer")
	}
}

func TestSendCancels(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	msgs := make(chan msg, 16)
	peerQueueFactory := makePeerQueueFactory(msgs)
	tp := testutil.GeneratePeers(3)
	self, peer1, peer2 := tp[0], tp[1], tp[2]
	peerManager := New(ctx, peerQueueFactory, self)
	cids := testutil.GenerateCids(4)

	// Connect to peer1 and peer2
229 230
	peerManager.Connected(peer1)
	peerManager.Connected(peer2)
dirkmc's avatar
dirkmc committed
231 232 233 234 235 236 237 238 239 240

	// Send 2 want-blocks and 1 want-have to peer1
	peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]})

	// Clear messages
	collectMessages(msgs, 2*time.Millisecond)

	// Send cancels for 1 want-block and 1 want-have
	peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]})
	collected := collectMessages(msgs, 2*time.Millisecond)
241

dirkmc's avatar
dirkmc committed
242 243
	if _, ok := collected[peer2]; ok {
		t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
244
	}
dirkmc's avatar
dirkmc committed
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
	if len(collected[peer1].cancels) != 2 {
		t.Fatal("Expected cancel to be sent for want-block and want-have sent to peer")
	}

	// Send cancels for all cids
	peerManager.SendCancels(ctx, cids)
	collected = collectMessages(msgs, 2*time.Millisecond)

	if _, ok := collected[peer2]; ok {
		t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
	}
	if len(collected[peer1].cancels) != 1 {
		t.Fatal("Expected cancel to be sent for remaining want-block")
	}
}

func (s *sess) ID() uint64 {
	return s.id
}
func (s *sess) SignalAvailability(p peer.ID, isAvailable bool) {
	s.available[p] = isAvailable
}
267

dirkmc's avatar
dirkmc committed
268 269 270 271
type sess struct {
	id        uint64
	available map[peer.ID]bool
}
272

dirkmc's avatar
dirkmc committed
273 274 275 276 277 278 279 280 281 282
func newSess(id uint64) *sess {
	return &sess{id, make(map[peer.ID]bool)}
}

func TestSessionRegistration(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	msgs := make(chan msg, 16)
	peerQueueFactory := makePeerQueueFactory(msgs)

283 284
	tp := testutil.GeneratePeers(3)
	self, p1, p2 := tp[0], tp[1], tp[2]
dirkmc's avatar
dirkmc committed
285 286 287 288 289 290 291
	peerManager := New(ctx, peerQueueFactory, self)

	id := uint64(1)
	s := newSess(id)
	peerManager.RegisterSession(p1, s)
	if s.available[p1] {
		t.Fatal("Expected peer not be available till connected")
292
	}
293 294 295 296
	peerManager.RegisterSession(p2, s)
	if s.available[p2] {
		t.Fatal("Expected peer not be available till connected")
	}
297

298
	peerManager.Connected(p1)
dirkmc's avatar
dirkmc committed
299 300
	if !s.available[p1] {
		t.Fatal("Expected signal callback")
301
	}
302
	peerManager.Connected(p2)
303 304 305
	if !s.available[p2] {
		t.Fatal("Expected signal callback")
	}
306

dirkmc's avatar
dirkmc committed
307 308 309
	peerManager.Disconnected(p1)
	if s.available[p1] {
		t.Fatal("Expected signal callback")
310
	}
311 312 313
	if !s.available[p2] {
		t.Fatal("Expected signal callback only for disconnected peer")
	}
314

dirkmc's avatar
dirkmc committed
315 316
	peerManager.UnregisterSession(id)

317
	peerManager.Connected(p1)
dirkmc's avatar
dirkmc committed
318 319
	if s.available[p1] {
		t.Fatal("Expected no signal callback (session unregistered)")
320 321
	}
}
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379

type benchPeerQueue struct {
}

func (*benchPeerQueue) Startup()  {}
func (*benchPeerQueue) Shutdown() {}

func (*benchPeerQueue) AddBroadcastWantHaves(whs []cid.Cid)   {}
func (*benchPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {}
func (*benchPeerQueue) AddCancels(cs []cid.Cid)               {}
func (*benchPeerQueue) ResponseReceived(ks []cid.Cid)         {}

// Simplistic benchmark to allow us to stress test
func BenchmarkPeerManager(b *testing.B) {
	b.StopTimer()

	ctx := context.Background()

	peerQueueFactory := func(ctx context.Context, p peer.ID) PeerQueue {
		return &benchPeerQueue{}
	}

	self := testutil.GeneratePeers(1)[0]
	peers := testutil.GeneratePeers(500)
	peerManager := New(ctx, peerQueueFactory, self)

	// Create a bunch of connections
	connected := 0
	for i := 0; i < len(peers); i++ {
		peerManager.Connected(peers[i])
		connected++
	}

	var wanted []cid.Cid

	b.StartTimer()
	for n := 0; n < b.N; n++ {
		// Pick a random peer
		i := rand.Intn(connected)

		// Alternately add either a few wants or many broadcast wants
		r := rand.Intn(8)
		if r == 0 {
			wants := testutil.GenerateCids(10)
			peerManager.SendWants(ctx, peers[i], wants[:2], wants[2:])
			wanted = append(wanted, wants...)
		} else if r == 1 {
			wants := testutil.GenerateCids(30)
			peerManager.BroadcastWantHaves(ctx, wants)
			wanted = append(wanted, wants...)
		} else {
			limit := len(wanted) / 10
			cancel := wanted[:limit]
			wanted = wanted[limit:]
			peerManager.SendCancels(ctx, cancel)
		}
	}
}