sessionpeermanager_test.go 11 KB
Newer Older
1 2 3 4
package sessionpeermanager

import (
	"context"
hannahhoward's avatar
hannahhoward committed
5
	"math/rand"
6
	"sync"
7 8 9 10 11 12
	"testing"
	"time"

	"github.com/ipfs/go-bitswap/testutil"

	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
13
	peer "github.com/libp2p/go-libp2p-core/peer"
14 15
)

16 17 18
type fakePeerProviderFinder struct {
	peers     []peer.ID
	completed chan struct{}
19 20
}

21
func (fppf *fakePeerProviderFinder) FindProvidersAsync(ctx context.Context, c cid.Cid) <-chan peer.ID {
22 23
	peerCh := make(chan peer.ID)
	go func() {
24 25

		for _, p := range fppf.peers {
26 27 28
			select {
			case peerCh <- p:
			case <-ctx.Done():
29
				close(peerCh)
30 31 32
				return
			}
		}
33 34 35
		close(peerCh)

		select {
36
		case fppf.completed <- struct{}{}:
37 38
		case <-ctx.Done():
		}
39 40 41 42
	}()
	return peerCh
}

43
type fakePeerTagger struct {
44
	lk          sync.Mutex
45
	taggedPeers []peer.ID
46
	wait        sync.WaitGroup
47 48
}

49 50
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, n int) {
	fpt.wait.Add(1)
51 52 53

	fpt.lk.Lock()
	defer fpt.lk.Unlock()
54
	fpt.taggedPeers = append(fpt.taggedPeers, p)
55
}
56

57 58 59
func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
	defer fpt.wait.Done()

60 61
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
62 63 64 65
	for i := 0; i < len(fpt.taggedPeers); i++ {
		if fpt.taggedPeers[i] == p {
			fpt.taggedPeers[i] = fpt.taggedPeers[len(fpt.taggedPeers)-1]
			fpt.taggedPeers = fpt.taggedPeers[:len(fpt.taggedPeers)-1]
66 67 68 69
			return
		}
	}
}
70

71 72 73 74 75 76
func (fpt *fakePeerTagger) count() int {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
	return len(fpt.taggedPeers)
}

77 78 79 80 81 82 83 84 85
func getPeers(sessionPeerManager *SessionPeerManager) []peer.ID {
	optimizedPeers := sessionPeerManager.GetOptimizedPeers()
	var peers []peer.ID
	for _, optimizedPeer := range optimizedPeers {
		peers = append(peers, optimizedPeer.Peer)
	}
	return peers
}

86 87 88 89
func TestFindingMorePeers(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
90 91
	completed := make(chan struct{})

92
	peers := testutil.GeneratePeers(5)
93 94
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
95 96 97
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

98
	sessionPeerManager := New(ctx, id, fpt, fppf)
99 100 101 102

	findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer findCancel()
	sessionPeerManager.FindMorePeers(ctx, c)
103 104 105 106 107 108 109
	select {
	case <-completed:
	case <-findCtx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

110
	sessionPeers := getPeers(sessionPeerManager)
111 112 113 114 115 116 117 118
	if len(sessionPeers) != len(peers) {
		t.Fatal("incorrect number of peers found")
	}
	for _, p := range sessionPeers {
		if !testutil.ContainsPeer(peers, p) {
			t.Fatal("incorrect peer found through finding providers")
		}
	}
119
	if len(fpt.taggedPeers) != len(peers) {
120 121 122 123 124 125 126 127 128
		t.Fatal("Peers were not tagged!")
	}
}

func TestRecordingReceivedBlocks(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
129 130
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{}
131 132 133
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

134
	sessionPeerManager := New(ctx, id, fpt, fppf)
135
	sessionPeerManager.RecordPeerResponse(p, []cid.Cid{c})
136
	time.Sleep(10 * time.Millisecond)
137
	sessionPeers := getPeers(sessionPeerManager)
138 139 140 141 142 143
	if len(sessionPeers) != 1 {
		t.Fatal("did not add peer on receive")
	}
	if sessionPeers[0] != p {
		t.Fatal("incorrect peer added on receive")
	}
144
	if len(fpt.taggedPeers) != 1 {
145 146 147 148
		t.Fatal("Peers was not tagged!")
	}
}

hannahhoward's avatar
hannahhoward committed
149 150
func TestOrderingPeers(t *testing.T) {
	ctx := context.Background()
151
	ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
hannahhoward's avatar
hannahhoward committed
152
	defer cancel()
153 154
	peerCount := 100
	peers := testutil.GeneratePeers(peerCount)
155
	completed := make(chan struct{})
156 157
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
hannahhoward's avatar
hannahhoward committed
158 159
	c := testutil.GenerateCids(1)
	id := testutil.GenerateSessionID()
160
	sessionPeerManager := New(ctx, id, fpt, fppf)
hannahhoward's avatar
hannahhoward committed
161 162 163

	// add all peers to session
	sessionPeerManager.FindMorePeers(ctx, c[0])
164 165 166 167 168
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
169
	time.Sleep(20 * time.Millisecond)
hannahhoward's avatar
hannahhoward committed
170 171 172 173 174

	// record broadcast
	sessionPeerManager.RecordPeerRequests(nil, c)

	// record receives
175 176 177 178
	randi := rand.Perm(peerCount)
	peer1 := peers[randi[0]]
	peer2 := peers[randi[1]]
	peer3 := peers[randi[2]]
179
	time.Sleep(10 * time.Millisecond)
180
	sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]})
181
	time.Sleep(50 * time.Millisecond)
182
	sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]})
183
	time.Sleep(10 * time.Millisecond)
184
	sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]})
hannahhoward's avatar
hannahhoward committed
185 186 187 188 189 190

	sessionPeers := sessionPeerManager.GetOptimizedPeers()
	if len(sessionPeers) != maxOptimizedPeers {
		t.Fatal("Should not return more than the max of optimized peers")
	}

191
	// should prioritize peers which are fastest
192
	if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
hannahhoward's avatar
hannahhoward committed
193 194 195
		t.Fatal("Did not prioritize peers that received blocks")
	}

196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
	// should give first peer rating of 1
	if sessionPeers[0].OptimizationRating < 1.0 {
		t.Fatal("Did not assign rating to best peer correctly")
	}

	// should give other optimized peers ratings between 0 & 1
	if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) ||
		(sessionPeers[2].OptimizationRating >= 1.0) || (sessionPeers[2].OptimizationRating <= 0.0) {
		t.Fatal("Did not assign rating to other optimized peers correctly")
	}

	// should other peers rating of zero
	for i := 3; i < maxOptimizedPeers; i++ {
		if sessionPeers[i].OptimizationRating != 0.0 {
			t.Fatal("Did not assign rating to unoptimized peer correctly")
		}
	}

214 215 216 217 218 219
	c2 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests(nil, c2)

	// Receive a second time
220
	sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c2[0]})
hannahhoward's avatar
hannahhoward committed
221 222 223 224 225 226 227

	// call again
	nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if len(nextSessionPeers) != maxOptimizedPeers {
		t.Fatal("Should not return more than the max of optimized peers")
	}

228
	// should sort by average latency
229 230
	if (nextSessionPeers[0].Peer != peer1) || (nextSessionPeers[1].Peer != peer3) ||
		(nextSessionPeers[2].Peer != peer2) {
231
		t.Fatal("Did not correctly update order of peers sorted by average latency")
hannahhoward's avatar
hannahhoward committed
232 233 234 235 236
	}

	// should randomize other peers
	totalSame := 0
	for i := 3; i < maxOptimizedPeers; i++ {
237
		if sessionPeers[i].Peer == nextSessionPeers[i].Peer {
hannahhoward's avatar
hannahhoward committed
238 239 240 241 242 243 244
			totalSame++
		}
	}
	if totalSame >= maxOptimizedPeers-3 {
		t.Fatal("should not return the same random peers each time")
	}
}
245

246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
func TestTimeoutsAndCancels(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(3)
	completed := make(chan struct{})
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
	c := testutil.GenerateCids(1)
	id := testutil.GenerateSessionID()
	sessionPeerManager := New(ctx, id, fpt, fppf)

	// add all peers to session
	sessionPeerManager.FindMorePeers(ctx, c[0])
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

	sessionPeerManager.SetTimeoutDuration(20 * time.Millisecond)

	// record broadcast
	sessionPeerManager.RecordPeerRequests(nil, c)

	// record receives
	peer1 := peers[0]
	peer2 := peers[1]
	peer3 := peers[2]
	time.Sleep(1 * time.Millisecond)
277
	sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]})
278
	time.Sleep(2 * time.Millisecond)
279
	sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]})
280
	time.Sleep(40 * time.Millisecond)
281
	sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]})
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326

	sessionPeers := sessionPeerManager.GetOptimizedPeers()

	// should prioritize peers which are fastest
	if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
		t.Fatal("Did not prioritize peers that received blocks")
	}

	// should give first peer rating of 1
	if sessionPeers[0].OptimizationRating < 1.0 {
		t.Fatal("Did not assign rating to best peer correctly")
	}

	// should give other optimized peers ratings between 0 & 1
	if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) {
		t.Fatal("Did not assign rating to other optimized peers correctly")
	}

	// should not record a response for a broadcast return that arrived AFTER the timeout period
	// leaving peer unoptimized
	if sessionPeers[2].OptimizationRating != 0 {
		t.Fatal("should not have recorded broadcast response for peer that arrived after timeout period")
	}

	// now we make a targeted request, which SHOULD affect peer
	// rating if it times out
	c2 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c2)
	// wait for a timeout
	time.Sleep(40 * time.Millisecond)

	// call again
	nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if sessionPeers[1].OptimizationRating <= nextSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should have affected optimization rating but did not")
	}

	// now we make a targeted request, but later cancel it
	// timing out should not affect rating
	c3 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c3)
327
	sessionPeerManager.RecordCancels([]cid.Cid{c3[0]})
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
	// wait for a timeout
	time.Sleep(40 * time.Millisecond)

	// call again
	thirdSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if nextSessionPeers[1].OptimizationRating != thirdSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should not have affected optimization rating but did")
	}

	// if we make a targeted request that is then cancelled, but we still
	// receive the block before the timeout, it's worth recording and affecting latency

	c4 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c4)
344
	sessionPeerManager.RecordCancels([]cid.Cid{c4[0]})
345
	time.Sleep(2 * time.Millisecond)
346
	sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c4[0]})
347
	time.Sleep(2 * time.Millisecond)
348 349 350 351 352 353

	// call again
	fourthSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if thirdSessionPeers[1].OptimizationRating >= fourthSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should have affected optimization rating but did not")
	}
354 355 356 357 358

	// ensure all peer latency tracking has been cleaned up
	if len(sessionPeerManager.activePeers[peer2].lt.requests) > 0 {
		t.Fatal("Latency request tracking should have been cleaned up but was not")
	}
359 360
}

361 362
func TestUntaggingPeers(t *testing.T) {
	ctx := context.Background()
363
	ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
364 365
	defer cancel()
	peers := testutil.GeneratePeers(5)
366
	completed := make(chan struct{})
367 368
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
369 370 371
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

372
	sessionPeerManager := New(ctx, id, fpt, fppf)
373 374

	sessionPeerManager.FindMorePeers(ctx, c)
375 376 377 378 379
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
380
	time.Sleep(15 * time.Millisecond)
381

382
	if fpt.count() != len(peers) {
383 384 385
		t.Fatal("Peers were not tagged!")
	}
	<-ctx.Done()
386
	fpt.wait.Wait()
387

388
	if fpt.count() != 0 {
389 390 391
		t.Fatal("Peers were not untagged!")
	}
}