sessionpeermanager_test.go 11.3 KB
Newer Older
1 2 3 4
package sessionpeermanager

import (
	"context"
5
	"fmt"
hannahhoward's avatar
hannahhoward committed
6
	"math/rand"
7
	"sync"
8 9 10 11 12 13
	"testing"
	"time"

	"github.com/ipfs/go-bitswap/testutil"

	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
14
	peer "github.com/libp2p/go-libp2p-core/peer"
15 16
)

17 18 19
type fakePeerProviderFinder struct {
	peers     []peer.ID
	completed chan struct{}
20 21
}

22
func (fppf *fakePeerProviderFinder) FindProvidersAsync(ctx context.Context, c cid.Cid) <-chan peer.ID {
23 24
	peerCh := make(chan peer.ID)
	go func() {
25 26

		for _, p := range fppf.peers {
27 28 29
			select {
			case peerCh <- p:
			case <-ctx.Done():
30
				close(peerCh)
31 32 33
				return
			}
		}
34 35 36
		close(peerCh)

		select {
37
		case fppf.completed <- struct{}{}:
38 39
		case <-ctx.Done():
		}
40 41 42 43
	}()
	return peerCh
}

44
type fakePeerTagger struct {
45
	lk          sync.Mutex
46
	taggedPeers []peer.ID
47
	wait        sync.WaitGroup
48 49
}

50 51
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, n int) {
	fpt.wait.Add(1)
52 53 54

	fpt.lk.Lock()
	defer fpt.lk.Unlock()
55
	fpt.taggedPeers = append(fpt.taggedPeers, p)
56
}
57

58 59 60
func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
	defer fpt.wait.Done()

61 62
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
63 64 65 66
	for i := 0; i < len(fpt.taggedPeers); i++ {
		if fpt.taggedPeers[i] == p {
			fpt.taggedPeers[i] = fpt.taggedPeers[len(fpt.taggedPeers)-1]
			fpt.taggedPeers = fpt.taggedPeers[:len(fpt.taggedPeers)-1]
67 68 69 70
			return
		}
	}
}
71

72 73 74 75 76 77
func (fpt *fakePeerTagger) count() int {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
	return len(fpt.taggedPeers)
}

78 79 80 81 82 83 84 85 86
func getPeers(sessionPeerManager *SessionPeerManager) []peer.ID {
	optimizedPeers := sessionPeerManager.GetOptimizedPeers()
	var peers []peer.ID
	for _, optimizedPeer := range optimizedPeers {
		peers = append(peers, optimizedPeer.Peer)
	}
	return peers
}

87 88 89 90
func TestFindingMorePeers(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
91 92
	completed := make(chan struct{})

93
	peers := testutil.GeneratePeers(5)
94 95
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
96 97 98
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

99
	sessionPeerManager := New(ctx, id, fpt, fppf)
100 101 102 103

	findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer findCancel()
	sessionPeerManager.FindMorePeers(ctx, c)
104 105 106 107 108 109 110
	select {
	case <-completed:
	case <-findCtx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

111
	sessionPeers := getPeers(sessionPeerManager)
112 113 114 115 116 117 118 119
	if len(sessionPeers) != len(peers) {
		t.Fatal("incorrect number of peers found")
	}
	for _, p := range sessionPeers {
		if !testutil.ContainsPeer(peers, p) {
			t.Fatal("incorrect peer found through finding providers")
		}
	}
120
	if len(fpt.taggedPeers) != len(peers) {
121 122 123 124 125 126 127 128 129
		t.Fatal("Peers were not tagged!")
	}
}

func TestRecordingReceivedBlocks(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
130 131
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{}
132 133 134
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

135
	sessionPeerManager := New(ctx, id, fpt, fppf)
136
	sessionPeerManager.RecordPeerResponse(p, []cid.Cid{c})
137
	time.Sleep(10 * time.Millisecond)
138
	sessionPeers := getPeers(sessionPeerManager)
139 140 141 142 143 144
	if len(sessionPeers) != 1 {
		t.Fatal("did not add peer on receive")
	}
	if sessionPeers[0] != p {
		t.Fatal("incorrect peer added on receive")
	}
145
	if len(fpt.taggedPeers) != 1 {
146 147 148 149
		t.Fatal("Peers was not tagged!")
	}
}

hannahhoward's avatar
hannahhoward committed
150 151
func TestOrderingPeers(t *testing.T) {
	ctx := context.Background()
152
	ctx, cancel := context.WithTimeout(ctx, 60*time.Millisecond)
hannahhoward's avatar
hannahhoward committed
153
	defer cancel()
154 155
	peerCount := 100
	peers := testutil.GeneratePeers(peerCount)
156
	completed := make(chan struct{})
157 158
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
hannahhoward's avatar
hannahhoward committed
159 160
	c := testutil.GenerateCids(1)
	id := testutil.GenerateSessionID()
161
	sessionPeerManager := New(ctx, id, fpt, fppf)
hannahhoward's avatar
hannahhoward committed
162 163 164

	// add all peers to session
	sessionPeerManager.FindMorePeers(ctx, c[0])
165 166 167 168 169
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
170
	time.Sleep(5 * time.Millisecond)
hannahhoward's avatar
hannahhoward committed
171 172 173 174 175

	// record broadcast
	sessionPeerManager.RecordPeerRequests(nil, c)

	// record receives
176 177 178 179
	randi := rand.Perm(peerCount)
	peer1 := peers[randi[0]]
	peer2 := peers[randi[1]]
	peer3 := peers[randi[2]]
180
	time.Sleep(5 * time.Millisecond)
181
	sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]})
182
	time.Sleep(25 * time.Millisecond)
183
	sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]})
184
	time.Sleep(5 * time.Millisecond)
185
	sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]})
hannahhoward's avatar
hannahhoward committed
186 187 188

	sessionPeers := sessionPeerManager.GetOptimizedPeers()
	if len(sessionPeers) != maxOptimizedPeers {
189
		t.Fatal(fmt.Sprintf("Should not return more (%d) than the max of optimized peers (%d)", len(sessionPeers), maxOptimizedPeers))
hannahhoward's avatar
hannahhoward committed
190 191
	}

192
	// should prioritize peers which are fastest
193 194 195
	// peer1: ~5ms
	// peer2: 5 + 25 = ~30ms
	// peer3: 5 + 25 + 5 = ~35ms
196
	if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
hannahhoward's avatar
hannahhoward committed
197 198 199
		t.Fatal("Did not prioritize peers that received blocks")
	}

200 201 202 203 204 205 206 207 208 209 210
	// should give first peer rating of 1
	if sessionPeers[0].OptimizationRating < 1.0 {
		t.Fatal("Did not assign rating to best peer correctly")
	}

	// should give other optimized peers ratings between 0 & 1
	if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) ||
		(sessionPeers[2].OptimizationRating >= 1.0) || (sessionPeers[2].OptimizationRating <= 0.0) {
		t.Fatal("Did not assign rating to other optimized peers correctly")
	}

211
	// should give other non-optimized peers rating of zero
212 213 214 215 216 217
	for i := 3; i < maxOptimizedPeers; i++ {
		if sessionPeers[i].OptimizationRating != 0.0 {
			t.Fatal("Did not assign rating to unoptimized peer correctly")
		}
	}

218 219 220 221 222 223
	c2 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests(nil, c2)

	// Receive a second time
224
	sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c2[0]})
hannahhoward's avatar
hannahhoward committed
225 226 227 228

	// call again
	nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if len(nextSessionPeers) != maxOptimizedPeers {
229
		t.Fatal(fmt.Sprintf("Should not return more (%d) than the max of optimized peers (%d)", len(nextSessionPeers), maxOptimizedPeers))
hannahhoward's avatar
hannahhoward committed
230 231
	}

232
	// should sort by average latency
233
	// peer1: ~5ms
234
	// peer3: (~35ms + ~5ms) / 2 = ~20ms
235
	// peer2: ~30ms
236 237
	if (nextSessionPeers[0].Peer != peer1) || (nextSessionPeers[1].Peer != peer3) ||
		(nextSessionPeers[2].Peer != peer2) {
238
		t.Fatal("Did not correctly update order of peers sorted by average latency")
hannahhoward's avatar
hannahhoward committed
239 240 241 242 243
	}

	// should randomize other peers
	totalSame := 0
	for i := 3; i < maxOptimizedPeers; i++ {
244
		if sessionPeers[i].Peer == nextSessionPeers[i].Peer {
hannahhoward's avatar
hannahhoward committed
245 246 247 248 249 250 251
			totalSame++
		}
	}
	if totalSame >= maxOptimizedPeers-3 {
		t.Fatal("should not return the same random peers each time")
	}
}
252

253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
func TestTimeoutsAndCancels(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(3)
	completed := make(chan struct{})
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
	c := testutil.GenerateCids(1)
	id := testutil.GenerateSessionID()
	sessionPeerManager := New(ctx, id, fpt, fppf)

	// add all peers to session
	sessionPeerManager.FindMorePeers(ctx, c[0])
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

	sessionPeerManager.SetTimeoutDuration(20 * time.Millisecond)

	// record broadcast
	sessionPeerManager.RecordPeerRequests(nil, c)

	// record receives
	peer1 := peers[0]
	peer2 := peers[1]
	peer3 := peers[2]
	time.Sleep(1 * time.Millisecond)
284
	sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]})
285
	time.Sleep(2 * time.Millisecond)
286
	sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]})
287
	time.Sleep(40 * time.Millisecond)
288
	sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]})
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333

	sessionPeers := sessionPeerManager.GetOptimizedPeers()

	// should prioritize peers which are fastest
	if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
		t.Fatal("Did not prioritize peers that received blocks")
	}

	// should give first peer rating of 1
	if sessionPeers[0].OptimizationRating < 1.0 {
		t.Fatal("Did not assign rating to best peer correctly")
	}

	// should give other optimized peers ratings between 0 & 1
	if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) {
		t.Fatal("Did not assign rating to other optimized peers correctly")
	}

	// should not record a response for a broadcast return that arrived AFTER the timeout period
	// leaving peer unoptimized
	if sessionPeers[2].OptimizationRating != 0 {
		t.Fatal("should not have recorded broadcast response for peer that arrived after timeout period")
	}

	// now we make a targeted request, which SHOULD affect peer
	// rating if it times out
	c2 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c2)
	// wait for a timeout
	time.Sleep(40 * time.Millisecond)

	// call again
	nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if sessionPeers[1].OptimizationRating <= nextSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should have affected optimization rating but did not")
	}

	// now we make a targeted request, but later cancel it
	// timing out should not affect rating
	c3 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c3)
334
	sessionPeerManager.RecordCancels([]cid.Cid{c3[0]})
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
	// wait for a timeout
	time.Sleep(40 * time.Millisecond)

	// call again
	thirdSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if nextSessionPeers[1].OptimizationRating != thirdSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should not have affected optimization rating but did")
	}

	// if we make a targeted request that is then cancelled, but we still
	// receive the block before the timeout, it's worth recording and affecting latency

	c4 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c4)
351
	sessionPeerManager.RecordCancels([]cid.Cid{c4[0]})
352
	time.Sleep(2 * time.Millisecond)
353
	sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c4[0]})
354
	time.Sleep(2 * time.Millisecond)
355 356 357 358 359 360

	// call again
	fourthSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if thirdSessionPeers[1].OptimizationRating >= fourthSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should have affected optimization rating but did not")
	}
361 362 363 364 365

	// ensure all peer latency tracking has been cleaned up
	if len(sessionPeerManager.activePeers[peer2].lt.requests) > 0 {
		t.Fatal("Latency request tracking should have been cleaned up but was not")
	}
366 367
}

368 369
func TestUntaggingPeers(t *testing.T) {
	ctx := context.Background()
370
	ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
371 372
	defer cancel()
	peers := testutil.GeneratePeers(5)
373
	completed := make(chan struct{})
374 375
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
376 377 378
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

379
	sessionPeerManager := New(ctx, id, fpt, fppf)
380 381

	sessionPeerManager.FindMorePeers(ctx, c)
382 383 384 385 386
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
387
	time.Sleep(15 * time.Millisecond)
388

389
	if fpt.count() != len(peers) {
390 391 392
		t.Fatal("Peers were not tagged!")
	}
	<-ctx.Done()
393
	fpt.wait.Wait()
394

395
	if fpt.count() != 0 {
396 397 398
		t.Fatal("Peers were not untagged!")
	}
}