sessionpeermanager_test.go 10.6 KB
Newer Older
1 2 3 4
package sessionpeermanager

import (
	"context"
hannahhoward's avatar
hannahhoward committed
5
	"math/rand"
6
	"sync"
7 8 9 10 11 12
	"testing"
	"time"

	"github.com/ipfs/go-bitswap/testutil"

	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
13
	peer "github.com/libp2p/go-libp2p-core/peer"
14 15
)

16 17 18
type fakePeerProviderFinder struct {
	peers     []peer.ID
	completed chan struct{}
19 20
}

21
func (fppf *fakePeerProviderFinder) FindProvidersAsync(ctx context.Context, c cid.Cid) <-chan peer.ID {
22 23
	peerCh := make(chan peer.ID)
	go func() {
24 25

		for _, p := range fppf.peers {
26 27 28
			select {
			case peerCh <- p:
			case <-ctx.Done():
29
				close(peerCh)
30 31 32
				return
			}
		}
33 34 35
		close(peerCh)

		select {
36
		case fppf.completed <- struct{}{}:
37 38
		case <-ctx.Done():
		}
39 40 41 42
	}()
	return peerCh
}

43
type fakePeerTagger struct {
44
	lk          sync.Mutex
45
	taggedPeers []peer.ID
46
	wait        sync.WaitGroup
47 48
}

49 50
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, n int) {
	fpt.wait.Add(1)
51 52 53

	fpt.lk.Lock()
	defer fpt.lk.Unlock()
54
	fpt.taggedPeers = append(fpt.taggedPeers, p)
55
}
56

57 58 59
func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
	defer fpt.wait.Done()

60 61
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
62 63 64 65
	for i := 0; i < len(fpt.taggedPeers); i++ {
		if fpt.taggedPeers[i] == p {
			fpt.taggedPeers[i] = fpt.taggedPeers[len(fpt.taggedPeers)-1]
			fpt.taggedPeers = fpt.taggedPeers[:len(fpt.taggedPeers)-1]
66 67 68 69
			return
		}
	}
}
70

71 72 73 74 75 76
func (fpt *fakePeerTagger) count() int {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
	return len(fpt.taggedPeers)
}

77 78 79 80 81 82 83 84 85
func getPeers(sessionPeerManager *SessionPeerManager) []peer.ID {
	optimizedPeers := sessionPeerManager.GetOptimizedPeers()
	var peers []peer.ID
	for _, optimizedPeer := range optimizedPeers {
		peers = append(peers, optimizedPeer.Peer)
	}
	return peers
}

86 87 88 89
func TestFindingMorePeers(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
90 91
	completed := make(chan struct{})

92
	peers := testutil.GeneratePeers(5)
93 94
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
95 96 97
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

98
	sessionPeerManager := New(ctx, id, fpt, fppf)
99 100 101 102

	findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer findCancel()
	sessionPeerManager.FindMorePeers(ctx, c)
103 104 105 106 107 108 109
	select {
	case <-completed:
	case <-findCtx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

110
	sessionPeers := getPeers(sessionPeerManager)
111 112 113 114 115 116 117 118
	if len(sessionPeers) != len(peers) {
		t.Fatal("incorrect number of peers found")
	}
	for _, p := range sessionPeers {
		if !testutil.ContainsPeer(peers, p) {
			t.Fatal("incorrect peer found through finding providers")
		}
	}
119
	if len(fpt.taggedPeers) != len(peers) {
120 121 122 123 124 125 126 127 128
		t.Fatal("Peers were not tagged!")
	}
}

func TestRecordingReceivedBlocks(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
129 130
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{}
131 132 133
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

134
	sessionPeerManager := New(ctx, id, fpt, fppf)
135 136
	sessionPeerManager.RecordPeerResponse(p, c)
	time.Sleep(10 * time.Millisecond)
137
	sessionPeers := getPeers(sessionPeerManager)
138 139 140 141 142 143
	if len(sessionPeers) != 1 {
		t.Fatal("did not add peer on receive")
	}
	if sessionPeers[0] != p {
		t.Fatal("incorrect peer added on receive")
	}
144
	if len(fpt.taggedPeers) != 1 {
145 146 147 148
		t.Fatal("Peers was not tagged!")
	}
}

hannahhoward's avatar
hannahhoward committed
149 150
func TestOrderingPeers(t *testing.T) {
	ctx := context.Background()
151
	ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
hannahhoward's avatar
hannahhoward committed
152 153
	defer cancel()
	peers := testutil.GeneratePeers(100)
154
	completed := make(chan struct{})
155 156
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
hannahhoward's avatar
hannahhoward committed
157 158
	c := testutil.GenerateCids(1)
	id := testutil.GenerateSessionID()
159
	sessionPeerManager := New(ctx, id, fpt, fppf)
hannahhoward's avatar
hannahhoward committed
160 161 162

	// add all peers to session
	sessionPeerManager.FindMorePeers(ctx, c[0])
163 164 165 166 167 168
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)
hannahhoward's avatar
hannahhoward committed
169 170 171 172 173 174 175 176 177 178

	// record broadcast
	sessionPeerManager.RecordPeerRequests(nil, c)

	// record receives
	peer1 := peers[rand.Intn(100)]
	peer2 := peers[rand.Intn(100)]
	peer3 := peers[rand.Intn(100)]
	time.Sleep(1 * time.Millisecond)
	sessionPeerManager.RecordPeerResponse(peer1, c[0])
179
	time.Sleep(5 * time.Millisecond)
hannahhoward's avatar
hannahhoward committed
180 181 182 183 184 185 186 187 188
	sessionPeerManager.RecordPeerResponse(peer2, c[0])
	time.Sleep(1 * time.Millisecond)
	sessionPeerManager.RecordPeerResponse(peer3, c[0])

	sessionPeers := sessionPeerManager.GetOptimizedPeers()
	if len(sessionPeers) != maxOptimizedPeers {
		t.Fatal("Should not return more than the max of optimized peers")
	}

189
	// should prioritize peers which are fastest
190
	if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
hannahhoward's avatar
hannahhoward committed
191 192 193
		t.Fatal("Did not prioritize peers that received blocks")
	}

194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
	// should give first peer rating of 1
	if sessionPeers[0].OptimizationRating < 1.0 {
		t.Fatal("Did not assign rating to best peer correctly")
	}

	// should give other optimized peers ratings between 0 & 1
	if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) ||
		(sessionPeers[2].OptimizationRating >= 1.0) || (sessionPeers[2].OptimizationRating <= 0.0) {
		t.Fatal("Did not assign rating to other optimized peers correctly")
	}

	// should other peers rating of zero
	for i := 3; i < maxOptimizedPeers; i++ {
		if sessionPeers[i].OptimizationRating != 0.0 {
			t.Fatal("Did not assign rating to unoptimized peer correctly")
		}
	}

212 213 214 215 216 217 218
	c2 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests(nil, c2)

	// Receive a second time
	sessionPeerManager.RecordPeerResponse(peer3, c2[0])
hannahhoward's avatar
hannahhoward committed
219 220 221 222 223 224 225

	// call again
	nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if len(nextSessionPeers) != maxOptimizedPeers {
		t.Fatal("Should not return more than the max of optimized peers")
	}

226
	// should sort by average latency
227 228
	if (nextSessionPeers[0].Peer != peer1) || (nextSessionPeers[1].Peer != peer3) ||
		(nextSessionPeers[2].Peer != peer2) {
229
		t.Fatal("Did not dedup peers which received multiple blocks")
hannahhoward's avatar
hannahhoward committed
230 231 232 233 234
	}

	// should randomize other peers
	totalSame := 0
	for i := 3; i < maxOptimizedPeers; i++ {
235
		if sessionPeers[i].Peer == nextSessionPeers[i].Peer {
hannahhoward's avatar
hannahhoward committed
236 237 238 239 240 241 242
			totalSame++
		}
	}
	if totalSame >= maxOptimizedPeers-3 {
		t.Fatal("should not return the same random peers each time")
	}
}
243

244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
func TestTimeoutsAndCancels(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
	defer cancel()
	peers := testutil.GeneratePeers(3)
	completed := make(chan struct{})
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
	c := testutil.GenerateCids(1)
	id := testutil.GenerateSessionID()
	sessionPeerManager := New(ctx, id, fpt, fppf)

	// add all peers to session
	sessionPeerManager.FindMorePeers(ctx, c[0])
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

	sessionPeerManager.SetTimeoutDuration(20 * time.Millisecond)

	// record broadcast
	sessionPeerManager.RecordPeerRequests(nil, c)

	// record receives
	peer1 := peers[0]
	peer2 := peers[1]
	peer3 := peers[2]
	time.Sleep(1 * time.Millisecond)
	sessionPeerManager.RecordPeerResponse(peer1, c[0])
	time.Sleep(2 * time.Millisecond)
	sessionPeerManager.RecordPeerResponse(peer2, c[0])
	time.Sleep(40 * time.Millisecond)
	sessionPeerManager.RecordPeerResponse(peer3, c[0])

	sessionPeers := sessionPeerManager.GetOptimizedPeers()

	// should prioritize peers which are fastest
	if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
		t.Fatal("Did not prioritize peers that received blocks")
	}

	// should give first peer rating of 1
	if sessionPeers[0].OptimizationRating < 1.0 {
		t.Fatal("Did not assign rating to best peer correctly")
	}

	// should give other optimized peers ratings between 0 & 1
	if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) {
		t.Fatal("Did not assign rating to other optimized peers correctly")
	}

	// should not record a response for a broadcast return that arrived AFTER the timeout period
	// leaving peer unoptimized
	if sessionPeers[2].OptimizationRating != 0 {
		t.Fatal("should not have recorded broadcast response for peer that arrived after timeout period")
	}

	// now we make a targeted request, which SHOULD affect peer
	// rating if it times out
	c2 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c2)
	// wait for a timeout
	time.Sleep(40 * time.Millisecond)

	// call again
	nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if sessionPeers[1].OptimizationRating <= nextSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should have affected optimization rating but did not")
	}

	// now we make a targeted request, but later cancel it
	// timing out should not affect rating
	c3 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c3)
	sessionPeerManager.RecordCancel(c3[0])
	// wait for a timeout
	time.Sleep(40 * time.Millisecond)

	// call again
	thirdSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if nextSessionPeers[1].OptimizationRating != thirdSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should not have affected optimization rating but did")
	}

	// if we make a targeted request that is then cancelled, but we still
	// receive the block before the timeout, it's worth recording and affecting latency

	c4 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c4)
	sessionPeerManager.RecordCancel(c4[0])
	time.Sleep(2 * time.Millisecond)
	sessionPeerManager.RecordPeerResponse(peer2, c4[0])

	// call again
	fourthSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if thirdSessionPeers[1].OptimizationRating >= fourthSessionPeers[1].OptimizationRating {
		t.Fatal("Timeout should have affected optimization rating but did not")
	}
}

353 354 355 356 357
func TestUntaggingPeers(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer cancel()
	peers := testutil.GeneratePeers(5)
358
	completed := make(chan struct{})
359 360
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
361 362 363
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

364
	sessionPeerManager := New(ctx, id, fpt, fppf)
365 366

	sessionPeerManager.FindMorePeers(ctx, c)
367 368 369 370 371 372 373
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

374
	if fpt.count() != len(peers) {
375 376 377
		t.Fatal("Peers were not tagged!")
	}
	<-ctx.Done()
378
	fpt.wait.Wait()
379

380
	if fpt.count() != 0 {
381 382 383
		t.Fatal("Peers were not untagged!")
	}
}