sessionpeermanager_test.go 10.9 KB
Newer Older
1 2 3 4
package sessionpeermanager

import (
	"context"
hannahhoward's avatar
hannahhoward committed
5
	"math/rand"
6
	"sync"
7 8 9 10 11 12
	"testing"
	"time"

	"github.com/ipfs/go-bitswap/testutil"

	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
13
	peer "github.com/libp2p/go-libp2p-core/peer"
14 15
)

16 17 18
type fakePeerProviderFinder struct {
	peers     []peer.ID
	completed chan struct{}
19 20
}

21
func (fppf *fakePeerProviderFinder) FindProvidersAsync(ctx context.Context, c cid.Cid) <-chan peer.ID {
22 23
	peerCh := make(chan peer.ID)
	go func() {
24 25

		for _, p := range fppf.peers {
26 27 28
			select {
			case peerCh <- p:
			case <-ctx.Done():
29
				close(peerCh)
30 31 32
				return
			}
		}
33 34 35
		close(peerCh)

		select {
36
		case fppf.completed <- struct{}{}:
37 38
		case <-ctx.Done():
		}
39 40 41 42
	}()
	return peerCh
}

43
type fakePeerTagger struct {
44
	lk          sync.Mutex
45
	taggedPeers []peer.ID
46
	wait        sync.WaitGroup
47 48
}

49 50
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, n int) {
	fpt.wait.Add(1)
51 52 53

	fpt.lk.Lock()
	defer fpt.lk.Unlock()
54
	fpt.taggedPeers = append(fpt.taggedPeers, p)
55
}
56

57 58 59
func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
	defer fpt.wait.Done()

60 61
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
62 63 64 65
	for i := 0; i < len(fpt.taggedPeers); i++ {
		if fpt.taggedPeers[i] == p {
			fpt.taggedPeers[i] = fpt.taggedPeers[len(fpt.taggedPeers)-1]
			fpt.taggedPeers = fpt.taggedPeers[:len(fpt.taggedPeers)-1]
66 67 68 69
			return
		}
	}
}
70

71 72 73 74 75 76
func (fpt *fakePeerTagger) count() int {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
	return len(fpt.taggedPeers)
}

77 78 79 80 81 82 83 84 85
func getPeers(sessionPeerManager *SessionPeerManager) []peer.ID {
	optimizedPeers := sessionPeerManager.GetOptimizedPeers()
	var peers []peer.ID
	for _, optimizedPeer := range optimizedPeers {
		peers = append(peers, optimizedPeer.Peer)
	}
	return peers
}

86 87 88 89
func TestFindingMorePeers(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
90 91
	completed := make(chan struct{})

92
	peers := testutil.GeneratePeers(5)
93 94
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
95 96 97
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

98
	sessionPeerManager := New(ctx, id, fpt, fppf)
99 100 101 102

	findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer findCancel()
	sessionPeerManager.FindMorePeers(ctx, c)
103 104 105 106 107 108 109
	select {
	case <-completed:
	case <-findCtx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

110
	sessionPeers := getPeers(sessionPeerManager)
111 112 113 114 115 116 117 118
	if len(sessionPeers) != len(peers) {
		t.Fatal("incorrect number of peers found")
	}
	for _, p := range sessionPeers {
		if !testutil.ContainsPeer(peers, p) {
			t.Fatal("incorrect peer found through finding providers")
		}
	}
119
	if len(fpt.taggedPeers) != len(peers) {
120 121 122 123 124 125 126 127 128
		t.Fatal("Peers were not tagged!")
	}
}

func TestRecordingReceivedBlocks(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
129 130
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{}
131 132 133
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

134
	sessionPeerManager := New(ctx, id, fpt, fppf)
135
	sessionPeerManager.RecordPeerResponse(p, []cid.Cid{c})
136
	time.Sleep(10 * time.Millisecond)
137
	sessionPeers := getPeers(sessionPeerManager)
138 139 140 141 142 143
	if len(sessionPeers) != 1 {
		t.Fatal("did not add peer on receive")
	}
	if sessionPeers[0] != p {
		t.Fatal("incorrect peer added on receive")
	}
144
	if len(fpt.taggedPeers) != 1 {
145 146 147 148
		t.Fatal("Peers was not tagged!")
	}
}

hannahhoward's avatar
hannahhoward committed
149 150
func TestOrderingPeers(t *testing.T) {
	ctx := context.Background()
151
	ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
hannahhoward's avatar
hannahhoward committed
152 153
	defer cancel()
	peers := testutil.GeneratePeers(100)
154
	completed := make(chan struct{})
155 156
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
hannahhoward's avatar
hannahhoward committed
157 158
	c := testutil.GenerateCids(1)
	id := testutil.GenerateSessionID()
159
	sessionPeerManager := New(ctx, id, fpt, fppf)
hannahhoward's avatar
hannahhoward committed
160 161 162

	// add all peers to session
	sessionPeerManager.FindMorePeers(ctx, c[0])
163 164 165 166 167 168
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)
hannahhoward's avatar
hannahhoward committed
169 170 171 172 173 174 175 176 177

	// record broadcast
	sessionPeerManager.RecordPeerRequests(nil, c)

	// record receives
	peer1 := peers[rand.Intn(100)]
	peer2 := peers[rand.Intn(100)]
	peer3 := peers[rand.Intn(100)]
	time.Sleep(1 * time.Millisecond)
178
	sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]})
179
	time.Sleep(5 * time.Millisecond)
180
	sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]})
hannahhoward's avatar
hannahhoward committed
181
	time.Sleep(1 * time.Millisecond)
182
	sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]})
hannahhoward's avatar
hannahhoward committed
183 184 185 186 187 188

	sessionPeers := sessionPeerManager.GetOptimizedPeers()
	if len(sessionPeers) != maxOptimizedPeers {
		t.Fatal("Should not return more than the max of optimized peers")
	}

189
	// should prioritize peers which are fastest
190
	if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
hannahhoward's avatar
hannahhoward committed
191 192 193
		t.Fatal("Did not prioritize peers that received blocks")
	}

194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
	// should give first peer rating of 1
	if sessionPeers[0].OptimizationRating < 1.0 {
		t.Fatal("Did not assign rating to best peer correctly")
	}

	// should give other optimized peers ratings between 0 & 1
	if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) ||
		(sessionPeers[2].OptimizationRating >= 1.0) || (sessionPeers[2].OptimizationRating <= 0.0) {
		t.Fatal("Did not assign rating to other optimized peers correctly")
	}

	// should other peers rating of zero
	for i := 3; i < maxOptimizedPeers; i++ {
		if sessionPeers[i].OptimizationRating != 0.0 {
			t.Fatal("Did not assign rating to unoptimized peer correctly")
		}
	}

212 213 214 215 216 217
	c2 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests(nil, c2)

	// Receive a second time
218
	sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c2[0]})
hannahhoward's avatar
hannahhoward committed
219 220 221 222 223 224 225

	// call again
	nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if len(nextSessionPeers) != maxOptimizedPeers {
		t.Fatal("Should not return more than the max of optimized peers")
	}

226
	// should sort by average latency
227 228
	if (nextSessionPeers[0].Peer != peer1) || (nextSessionPeers[1].Peer != peer3) ||
		(nextSessionPeers[2].Peer != peer2) {
229
		t.Fatal("Did not dedup peers which received multiple blocks")
hannahhoward's avatar
hannahhoward committed
230 231 232 233 234
	}

	// should randomize other peers
	totalSame := 0
	for i := 3; i < maxOptimizedPeers; i++ {
235
		if sessionPeers[i].Peer == nextSessionPeers[i].Peer {
hannahhoward's avatar
hannahhoward committed
236 237 238 239 240 241 242
			totalSame++
		}
	}
	if totalSame >= maxOptimizedPeers-3 {
		t.Fatal("should not return the same random peers each time")
	}
}
243

244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
func TestTimeoutsAndCancels(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(3)
	completed := make(chan struct{})
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
	c := testutil.GenerateCids(1)
	id := testutil.GenerateSessionID()
	sessionPeerManager := New(ctx, id, fpt, fppf)

	// add all peers to session
	sessionPeerManager.FindMorePeers(ctx, c[0])
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

	sessionPeerManager.SetTimeoutDuration(20 * time.Millisecond)

	// record broadcast
	sessionPeerManager.RecordPeerRequests(nil, c)

	// record receives
	peer1 := peers[0]
	peer2 := peers[1]
	peer3 := peers[2]
	time.Sleep(1 * time.Millisecond)
275
	sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]})
276
	time.Sleep(2 * time.Millisecond)
277
	sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]})
278
	time.Sleep(40 * time.Millisecond)
279
	sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]})
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324

	sessionPeers := sessionPeerManager.GetOptimizedPeers()

	// should prioritize peers which are fastest
	if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
		t.Fatal("Did not prioritize peers that received blocks")
	}

	// should give first peer rating of 1
	if sessionPeers[0].OptimizationRating < 1.0 {
		t.Fatal("Did not assign rating to best peer correctly")
	}

	// should give other optimized peers ratings between 0 & 1
	if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) {
		t.Fatal("Did not assign rating to other optimized peers correctly")
	}

	// should not record a response for a broadcast return that arrived AFTER the timeout period
	// leaving peer unoptimized
	if sessionPeers[2].OptimizationRating != 0 {
		t.Fatal("should not have recorded broadcast response for peer that arrived after timeout period")
	}

	// now we make a targeted request, which SHOULD affect peer
	// rating if it times out
	c2 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c2)
	// wait for a timeout
	time.Sleep(40 * time.Millisecond)

	// call again
	nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if sessionPeers[1].OptimizationRating <= nextSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should have affected optimization rating but did not")
	}

	// now we make a targeted request, but later cancel it
	// timing out should not affect rating
	c3 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c3)
325
	sessionPeerManager.RecordCancels([]cid.Cid{c3[0]})
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
	// wait for a timeout
	time.Sleep(40 * time.Millisecond)

	// call again
	thirdSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if nextSessionPeers[1].OptimizationRating != thirdSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should not have affected optimization rating but did")
	}

	// if we make a targeted request that is then cancelled, but we still
	// receive the block before the timeout, it's worth recording and affecting latency

	c4 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c4)
342
	sessionPeerManager.RecordCancels([]cid.Cid{c4[0]})
343
	time.Sleep(2 * time.Millisecond)
344
	sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c4[0]})
345
	time.Sleep(2 * time.Millisecond)
346 347 348 349 350 351

	// call again
	fourthSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if thirdSessionPeers[1].OptimizationRating >= fourthSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should have affected optimization rating but did not")
	}
352 353 354 355 356

	// ensure all peer latency tracking has been cleaned up
	if len(sessionPeerManager.activePeers[peer2].lt.requests) > 0 {
		t.Fatal("Latency request tracking should have been cleaned up but was not")
	}
357 358
}

359 360 361 362 363
func TestUntaggingPeers(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer cancel()
	peers := testutil.GeneratePeers(5)
364
	completed := make(chan struct{})
365 366
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
367 368 369
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

370
	sessionPeerManager := New(ctx, id, fpt, fppf)
371 372

	sessionPeerManager.FindMorePeers(ctx, c)
373 374 375 376 377 378 379
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

380
	if fpt.count() != len(peers) {
381 382 383
		t.Fatal("Peers were not tagged!")
	}
	<-ctx.Done()
384
	fpt.wait.Wait()
385

386
	if fpt.count() != 0 {
387 388 389
		t.Fatal("Peers were not untagged!")
	}
}