sessionpeermanager_test.go 11.4 KB
Newer Older
1 2 3 4
package sessionpeermanager

import (
	"context"
5
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7
	"sync"
8 9 10 11 12 13
	"testing"
	"time"

	"github.com/ipfs/go-bitswap/testutil"

	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
14
	peer "github.com/libp2p/go-libp2p-core/peer"
15 16
)

17 18 19
type fakePeerProviderFinder struct {
	peers     []peer.ID
	completed chan struct{}
20 21
}

22
func (fppf *fakePeerProviderFinder) FindProvidersAsync(ctx context.Context, c cid.Cid) <-chan peer.ID {
23 24
	peerCh := make(chan peer.ID)
	go func() {
25 26

		for _, p := range fppf.peers {
27 28 29
			select {
			case peerCh <- p:
			case <-ctx.Done():
30
				close(peerCh)
31 32 33
				return
			}
		}
34 35 36
		close(peerCh)

		select {
37
		case fppf.completed <- struct{}{}:
38 39
		case <-ctx.Done():
		}
40 41 42 43
	}()
	return peerCh
}

44
type fakePeerTagger struct {
45
	lk          sync.Mutex
46
	taggedPeers []peer.ID
47
	wait        sync.WaitGroup
48 49
}

50 51
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, n int) {
	fpt.wait.Add(1)
52 53 54

	fpt.lk.Lock()
	defer fpt.lk.Unlock()
55
	fpt.taggedPeers = append(fpt.taggedPeers, p)
56
}
57

58 59 60
func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
	defer fpt.wait.Done()

61 62
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
63 64 65 66
	for i := 0; i < len(fpt.taggedPeers); i++ {
		if fpt.taggedPeers[i] == p {
			fpt.taggedPeers[i] = fpt.taggedPeers[len(fpt.taggedPeers)-1]
			fpt.taggedPeers = fpt.taggedPeers[:len(fpt.taggedPeers)-1]
67 68 69 70
			return
		}
	}
}
71

72 73 74 75 76 77
func (fpt *fakePeerTagger) count() int {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
	return len(fpt.taggedPeers)
}

78 79 80 81 82 83 84 85 86
func getPeers(sessionPeerManager *SessionPeerManager) []peer.ID {
	optimizedPeers := sessionPeerManager.GetOptimizedPeers()
	var peers []peer.ID
	for _, optimizedPeer := range optimizedPeers {
		peers = append(peers, optimizedPeer.Peer)
	}
	return peers
}

87 88 89 90
func TestFindingMorePeers(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
91 92
	completed := make(chan struct{})

93
	peers := testutil.GeneratePeers(5)
94 95
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
96 97 98
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

99
	sessionPeerManager := New(ctx, id, fpt, fppf)
100 101 102 103

	findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer findCancel()
	sessionPeerManager.FindMorePeers(ctx, c)
104 105 106 107 108 109 110
	select {
	case <-completed:
	case <-findCtx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

111
	sessionPeers := getPeers(sessionPeerManager)
112 113 114 115 116 117 118 119
	if len(sessionPeers) != len(peers) {
		t.Fatal("incorrect number of peers found")
	}
	for _, p := range sessionPeers {
		if !testutil.ContainsPeer(peers, p) {
			t.Fatal("incorrect peer found through finding providers")
		}
	}
120
	if len(fpt.taggedPeers) != len(peers) {
121 122 123 124 125 126 127 128 129
		t.Fatal("Peers were not tagged!")
	}
}

func TestRecordingReceivedBlocks(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
130 131
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{}
132 133 134
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

135
	sessionPeerManager := New(ctx, id, fpt, fppf)
136
	sessionPeerManager.RecordPeerResponse(p, []cid.Cid{c})
137
	time.Sleep(10 * time.Millisecond)
138
	sessionPeers := getPeers(sessionPeerManager)
139 140 141 142 143 144
	if len(sessionPeers) != 1 {
		t.Fatal("did not add peer on receive")
	}
	if sessionPeers[0] != p {
		t.Fatal("incorrect peer added on receive")
	}
145
	if len(fpt.taggedPeers) != 1 {
146 147 148 149
		t.Fatal("Peers was not tagged!")
	}
}

hannahhoward's avatar
hannahhoward committed
150 151
func TestOrderingPeers(t *testing.T) {
	ctx := context.Background()
152
	ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
hannahhoward's avatar
hannahhoward committed
153
	defer cancel()
154 155
	peerCount := 100
	peers := testutil.GeneratePeers(peerCount)
156
	completed := make(chan struct{})
157 158
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
hannahhoward's avatar
hannahhoward committed
159 160
	c := testutil.GenerateCids(1)
	id := testutil.GenerateSessionID()
161
	sessionPeerManager := New(ctx, id, fpt, fppf)
hannahhoward's avatar
hannahhoward committed
162 163 164

	// add all peers to session
	sessionPeerManager.FindMorePeers(ctx, c[0])
165 166 167 168 169
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
170
	time.Sleep(20 * time.Millisecond)
hannahhoward's avatar
hannahhoward committed
171 172 173 174 175

	// record broadcast
	sessionPeerManager.RecordPeerRequests(nil, c)

	// record receives
176 177 178 179
	randi := rand.Perm(peerCount)
	peer1 := peers[randi[0]]
	peer2 := peers[randi[1]]
	peer3 := peers[randi[2]]
180
	time.Sleep(5 * time.Millisecond)
181
	sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]})
182
	time.Sleep(25 * time.Millisecond)
183
	sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]})
184
	time.Sleep(5 * time.Millisecond)
185
	sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]})
hannahhoward's avatar
hannahhoward committed
186

187 188
	time.Sleep(5 * time.Millisecond)

hannahhoward's avatar
hannahhoward committed
189 190
	sessionPeers := sessionPeerManager.GetOptimizedPeers()
	if len(sessionPeers) != maxOptimizedPeers {
191
		t.Fatal(fmt.Sprintf("Should not return more (%d) than the max of optimized peers (%d)", len(sessionPeers), maxOptimizedPeers))
hannahhoward's avatar
hannahhoward committed
192 193
	}

194
	// should prioritize peers which are fastest
195 196 197
	// peer1: ~5ms
	// peer2: 5 + 25 = ~30ms
	// peer3: 5 + 25 + 5 = ~35ms
198
	if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
hannahhoward's avatar
hannahhoward committed
199 200 201
		t.Fatal("Did not prioritize peers that received blocks")
	}

202 203 204 205 206 207 208 209 210 211 212
	// should give first peer rating of 1
	if sessionPeers[0].OptimizationRating < 1.0 {
		t.Fatal("Did not assign rating to best peer correctly")
	}

	// should give other optimized peers ratings between 0 & 1
	if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) ||
		(sessionPeers[2].OptimizationRating >= 1.0) || (sessionPeers[2].OptimizationRating <= 0.0) {
		t.Fatal("Did not assign rating to other optimized peers correctly")
	}

213
	// should give other non-optimized peers rating of zero
214 215 216 217 218 219
	for i := 3; i < maxOptimizedPeers; i++ {
		if sessionPeers[i].OptimizationRating != 0.0 {
			t.Fatal("Did not assign rating to unoptimized peer correctly")
		}
	}

220 221 222 223 224 225
	c2 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests(nil, c2)

	// Receive a second time
226
	sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c2[0]})
hannahhoward's avatar
hannahhoward committed
227

228 229
	time.Sleep(5 * time.Millisecond)

hannahhoward's avatar
hannahhoward committed
230 231 232
	// call again
	nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if len(nextSessionPeers) != maxOptimizedPeers {
233
		t.Fatal(fmt.Sprintf("Should not return more (%d) than the max of optimized peers (%d)", len(nextSessionPeers), maxOptimizedPeers))
hannahhoward's avatar
hannahhoward committed
234 235
	}

236
	// should sort by average latency
237 238 239
	// peer1: ~5ms
	// peer3: (~35ms + ~5ms + ~5ms) / 2 = ~23ms
	// peer2: ~30ms
240 241
	if (nextSessionPeers[0].Peer != peer1) || (nextSessionPeers[1].Peer != peer3) ||
		(nextSessionPeers[2].Peer != peer2) {
242
		t.Fatal("Did not correctly update order of peers sorted by average latency")
hannahhoward's avatar
hannahhoward committed
243 244 245 246 247
	}

	// should randomize other peers
	totalSame := 0
	for i := 3; i < maxOptimizedPeers; i++ {
248
		if sessionPeers[i].Peer == nextSessionPeers[i].Peer {
hannahhoward's avatar
hannahhoward committed
249 250 251 252 253 254 255
			totalSame++
		}
	}
	if totalSame >= maxOptimizedPeers-3 {
		t.Fatal("should not return the same random peers each time")
	}
}
256

257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
func TestTimeoutsAndCancels(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(3)
	completed := make(chan struct{})
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
	c := testutil.GenerateCids(1)
	id := testutil.GenerateSessionID()
	sessionPeerManager := New(ctx, id, fpt, fppf)

	// add all peers to session
	sessionPeerManager.FindMorePeers(ctx, c[0])
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

	sessionPeerManager.SetTimeoutDuration(20 * time.Millisecond)

	// record broadcast
	sessionPeerManager.RecordPeerRequests(nil, c)

	// record receives
	peer1 := peers[0]
	peer2 := peers[1]
	peer3 := peers[2]
	time.Sleep(1 * time.Millisecond)
288
	sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]})
289
	time.Sleep(2 * time.Millisecond)
290
	sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]})
291
	time.Sleep(40 * time.Millisecond)
292
	sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]})
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337

	sessionPeers := sessionPeerManager.GetOptimizedPeers()

	// should prioritize peers which are fastest
	if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
		t.Fatal("Did not prioritize peers that received blocks")
	}

	// should give first peer rating of 1
	if sessionPeers[0].OptimizationRating < 1.0 {
		t.Fatal("Did not assign rating to best peer correctly")
	}

	// should give other optimized peers ratings between 0 & 1
	if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) {
		t.Fatal("Did not assign rating to other optimized peers correctly")
	}

	// should not record a response for a broadcast return that arrived AFTER the timeout period
	// leaving peer unoptimized
	if sessionPeers[2].OptimizationRating != 0 {
		t.Fatal("should not have recorded broadcast response for peer that arrived after timeout period")
	}

	// now we make a targeted request, which SHOULD affect peer
	// rating if it times out
	c2 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c2)
	// wait for a timeout
	time.Sleep(40 * time.Millisecond)

	// call again
	nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if sessionPeers[1].OptimizationRating <= nextSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should have affected optimization rating but did not")
	}

	// now we make a targeted request, but later cancel it
	// timing out should not affect rating
	c3 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c3)
338
	sessionPeerManager.RecordCancels([]cid.Cid{c3[0]})
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
	// wait for a timeout
	time.Sleep(40 * time.Millisecond)

	// call again
	thirdSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if nextSessionPeers[1].OptimizationRating != thirdSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should not have affected optimization rating but did")
	}

	// if we make a targeted request that is then cancelled, but we still
	// receive the block before the timeout, it's worth recording and affecting latency

	c4 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c4)
355
	sessionPeerManager.RecordCancels([]cid.Cid{c4[0]})
356
	time.Sleep(2 * time.Millisecond)
357
	sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c4[0]})
358
	time.Sleep(2 * time.Millisecond)
359 360 361 362 363 364

	// call again
	fourthSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if thirdSessionPeers[1].OptimizationRating >= fourthSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should have affected optimization rating but did not")
	}
365 366 367 368 369

	// ensure all peer latency tracking has been cleaned up
	if len(sessionPeerManager.activePeers[peer2].lt.requests) > 0 {
		t.Fatal("Latency request tracking should have been cleaned up but was not")
	}
370 371
}

372 373
func TestUntaggingPeers(t *testing.T) {
	ctx := context.Background()
374
	ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
375 376
	defer cancel()
	peers := testutil.GeneratePeers(5)
377
	completed := make(chan struct{})
378 379
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
380 381 382
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

383
	sessionPeerManager := New(ctx, id, fpt, fppf)
384 385

	sessionPeerManager.FindMorePeers(ctx, c)
386 387 388 389 390
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
391
	time.Sleep(15 * time.Millisecond)
392

393
	if fpt.count() != len(peers) {
394 395 396
		t.Fatal("Peers were not tagged!")
	}
	<-ctx.Done()
397
	fpt.wait.Wait()
398

399
	if fpt.count() != 0 {
400 401 402
		t.Fatal("Peers were not untagged!")
	}
}