Commit fafdaaec authored by Dirk McCormick's avatar Dirk McCormick

test: fix session peer manager tests

parent 369b794b
package sessionpeermanager package sessionpeermanager
import ( import (
"context"
"fmt"
"math/rand"
"sync" "sync"
"testing" "testing"
"time"
"github.com/ipfs/go-bitswap/internal/testutil" "github.com/ipfs/go-bitswap/internal/testutil"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
) )
type fakePeerProviderFinder struct {
peers []peer.ID
completed chan struct{}
}
func (fppf *fakePeerProviderFinder) FindProvidersAsync(ctx context.Context, c cid.Cid) <-chan peer.ID {
peerCh := make(chan peer.ID)
go func() {
for _, p := range fppf.peers {
select {
case peerCh <- p:
case <-ctx.Done():
close(peerCh)
return
}
}
close(peerCh)
select {
case fppf.completed <- struct{}{}:
case <-ctx.Done():
}
}()
return peerCh
}
type fakePeerTagger struct { type fakePeerTagger struct {
lk sync.Mutex lk sync.Mutex
taggedPeers []peer.ID taggedPeers []peer.ID
...@@ -75,324 +42,230 @@ func (fpt *fakePeerTagger) count() int { ...@@ -75,324 +42,230 @@ func (fpt *fakePeerTagger) count() int {
return len(fpt.taggedPeers) return len(fpt.taggedPeers)
} }
func getPeers(sessionPeerManager *SessionPeerManager) []peer.ID { // func TestFindingMorePeers(t *testing.T) {
optimizedPeers := sessionPeerManager.GetOptimizedPeers() // ctx := context.Background()
var peers []peer.ID // ctx, cancel := context.WithCancel(ctx)
for _, optimizedPeer := range optimizedPeers { // defer cancel()
peers = append(peers, optimizedPeer.Peer) // completed := make(chan struct{})
// peers := testutil.GeneratePeers(5)
// fpt := &fakePeerTagger{}
// fppf := &fakePeerProviderFinder{peers, completed}
// c := testutil.GenerateCids(1)[0]
// id := testutil.GenerateSessionID()
// sessionPeerManager := New(ctx, id, fpt, fppf)
// findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond)
// defer findCancel()
// sessionPeerManager.FindMorePeers(ctx, c)
// select {
// case <-completed:
// case <-findCtx.Done():
// t.Fatal("Did not finish finding providers")
// }
// time.Sleep(2 * time.Millisecond)
// sessionPeers := getPeers(sessionPeerManager)
// if len(sessionPeers) != len(peers) {
// t.Fatal("incorrect number of peers found")
// }
// for _, p := range sessionPeers {
// if !testutil.ContainsPeer(peers, p) {
// t.Fatal("incorrect peer found through finding providers")
// }
// }
// if len(fpt.taggedPeers) != len(peers) {
// t.Fatal("Peers were not tagged!")
// }
// }
func TestAddPeers(t *testing.T) {
peers := testutil.GeneratePeers(2)
spm := New(1, &fakePeerTagger{})
isNew := spm.AddPeer(peers[0])
if !isNew {
t.Fatal("Expected peer to be new")
}
isNew = spm.AddPeer(peers[0])
if isNew {
t.Fatal("Expected peer to no longer be new")
}
isNew = spm.AddPeer(peers[1])
if !isNew {
t.Fatal("Expected peer to be new")
} }
return peers
} }
func TestFindingMorePeers(t *testing.T) { func TestRemovePeers(t *testing.T) {
ctx := context.Background() peers := testutil.GeneratePeers(2)
ctx, cancel := context.WithCancel(ctx) spm := New(1, &fakePeerTagger{})
defer cancel()
completed := make(chan struct{})
peers := testutil.GeneratePeers(5)
fpt := &fakePeerTagger{}
fppf := &fakePeerProviderFinder{peers, completed}
c := testutil.GenerateCids(1)[0]
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpt, fppf)
findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond) existed := spm.RemovePeer(peers[0])
defer findCancel() if existed {
sessionPeerManager.FindMorePeers(ctx, c) t.Fatal("Expected peer not to exist")
select {
case <-completed:
case <-findCtx.Done():
t.Fatal("Did not finish finding providers")
} }
time.Sleep(2 * time.Millisecond)
sessionPeers := getPeers(sessionPeerManager) spm.AddPeer(peers[0])
if len(sessionPeers) != len(peers) { spm.AddPeer(peers[1])
t.Fatal("incorrect number of peers found")
}
for _, p := range sessionPeers {
if !testutil.ContainsPeer(peers, p) {
t.Fatal("incorrect peer found through finding providers")
}
}
if len(fpt.taggedPeers) != len(peers) {
t.Fatal("Peers were not tagged!")
}
}
func TestRecordingReceivedBlocks(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
p := testutil.GeneratePeers(1)[0]
fpt := &fakePeerTagger{}
fppf := &fakePeerProviderFinder{}
c := testutil.GenerateCids(1)[0]
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpt, fppf) existed = spm.RemovePeer(peers[0])
sessionPeerManager.RecordPeerResponse(p, []cid.Cid{c}) if !existed {
time.Sleep(10 * time.Millisecond) t.Fatal("Expected peer to exist")
sessionPeers := getPeers(sessionPeerManager)
if len(sessionPeers) != 1 {
t.Fatal("did not add peer on receive")
} }
if sessionPeers[0] != p { existed = spm.RemovePeer(peers[1])
t.Fatal("incorrect peer added on receive") if !existed {
t.Fatal("Expected peer to exist")
} }
if len(fpt.taggedPeers) != 1 { existed = spm.RemovePeer(peers[0])
t.Fatal("Peers was not tagged!") if existed {
t.Fatal("Expected peer not to have existed")
} }
} }
func TestOrderingPeers(t *testing.T) { func TestHasPeers(t *testing.T) {
ctx := context.Background() peers := testutil.GeneratePeers(2)
ctx, cancel := context.WithTimeout(ctx, 60*time.Millisecond) spm := New(1, &fakePeerTagger{})
defer cancel()
peerCount := 100
peers := testutil.GeneratePeers(peerCount)
completed := make(chan struct{})
fpt := &fakePeerTagger{}
fppf := &fakePeerProviderFinder{peers, completed}
c := testutil.GenerateCids(1)
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpt, fppf)
// add all peers to session
sessionPeerManager.FindMorePeers(ctx, c[0])
select {
case <-completed:
case <-ctx.Done():
t.Fatal("Did not finish finding providers")
}
time.Sleep(5 * time.Millisecond)
// record broadcast
sessionPeerManager.RecordPeerRequests(nil, c)
// record receives
randi := rand.Perm(peerCount)
peer1 := peers[randi[0]]
peer2 := peers[randi[1]]
peer3 := peers[randi[2]]
time.Sleep(5 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]})
time.Sleep(25 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]})
time.Sleep(5 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]})
sessionPeers := sessionPeerManager.GetOptimizedPeers() if spm.HasPeers() {
if len(sessionPeers) != maxOptimizedPeers { t.Fatal("Expected not to have peers yet")
t.Fatal(fmt.Sprintf("Should not return more (%d) than the max of optimized peers (%d)", len(sessionPeers), maxOptimizedPeers))
} }
// should prioritize peers which are fastest spm.AddPeer(peers[0])
// peer1: ~5ms if !spm.HasPeers() {
// peer2: 5 + 25 = ~30ms t.Fatal("Expected to have peers")
// peer3: 5 + 25 + 5 = ~35ms
if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
t.Fatal("Did not prioritize peers that received blocks")
} }
// should give first peer rating of 1 spm.AddPeer(peers[1])
if sessionPeers[0].OptimizationRating < 1.0 { if !spm.HasPeers() {
t.Fatal("Did not assign rating to best peer correctly") t.Fatal("Expected to have peers")
} }
// should give other optimized peers ratings between 0 & 1 spm.RemovePeer(peers[0])
if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) || if !spm.HasPeers() {
(sessionPeers[2].OptimizationRating >= 1.0) || (sessionPeers[2].OptimizationRating <= 0.0) { t.Fatal("Expected to have peers")
t.Fatal("Did not assign rating to other optimized peers correctly")
} }
// should give other non-optimized peers rating of zero spm.RemovePeer(peers[1])
for i := 3; i < maxOptimizedPeers; i++ { if spm.HasPeers() {
if sessionPeers[i].OptimizationRating != 0.0 { t.Fatal("Expected to no longer have peers")
t.Fatal("Did not assign rating to unoptimized peer correctly")
} }
} }
c2 := testutil.GenerateCids(1)
// Request again
sessionPeerManager.RecordPeerRequests(nil, c2)
// Receive a second time func TestHasPeer(t *testing.T) {
sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c2[0]}) peers := testutil.GeneratePeers(2)
spm := New(1, &fakePeerTagger{})
// call again if spm.HasPeer(peers[0]) {
nextSessionPeers := sessionPeerManager.GetOptimizedPeers() t.Fatal("Expected not to have peer yet")
if len(nextSessionPeers) != maxOptimizedPeers {
t.Fatal(fmt.Sprintf("Should not return more (%d) than the max of optimized peers (%d)", len(nextSessionPeers), maxOptimizedPeers))
} }
// should sort by average latency spm.AddPeer(peers[0])
// peer1: ~5ms if !spm.HasPeer(peers[0]) {
// peer3: (~35ms + ~5ms) / 2 = ~20ms t.Fatal("Expected to have peer")
// peer2: ~30ms
if (nextSessionPeers[0].Peer != peer1) || (nextSessionPeers[1].Peer != peer3) ||
(nextSessionPeers[2].Peer != peer2) {
t.Fatal("Did not correctly update order of peers sorted by average latency")
} }
// should randomize other peers spm.AddPeer(peers[1])
totalSame := 0 if !spm.HasPeer(peers[1]) {
for i := 3; i < maxOptimizedPeers; i++ { t.Fatal("Expected to have peer")
if sessionPeers[i].Peer == nextSessionPeers[i].Peer {
totalSame++
} }
}
if totalSame >= maxOptimizedPeers-3 {
t.Fatal("should not return the same random peers each time")
}
}
func TestTimeoutsAndCancels(t *testing.T) { spm.RemovePeer(peers[0])
ctx := context.Background() if spm.HasPeer(peers[0]) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second) t.Fatal("Expected not to have peer")
defer cancel()
peers := testutil.GeneratePeers(3)
completed := make(chan struct{})
fpt := &fakePeerTagger{}
fppf := &fakePeerProviderFinder{peers, completed}
c := testutil.GenerateCids(1)
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpt, fppf)
// add all peers to session
sessionPeerManager.FindMorePeers(ctx, c[0])
select {
case <-completed:
case <-ctx.Done():
t.Fatal("Did not finish finding providers")
} }
time.Sleep(2 * time.Millisecond)
sessionPeerManager.SetTimeoutDuration(20 * time.Millisecond)
// record broadcast if !spm.HasPeer(peers[1]) {
sessionPeerManager.RecordPeerRequests(nil, c) t.Fatal("Expected to have peer")
}
// record receives }
peer1 := peers[0]
peer2 := peers[1]
peer3 := peers[2]
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer1, []cid.Cid{c[0]})
time.Sleep(2 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c[0]})
time.Sleep(40 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer3, []cid.Cid{c[0]})
sessionPeers := sessionPeerManager.GetOptimizedPeers() func TestPeers(t *testing.T) {
peers := testutil.GeneratePeers(2)
spm := New(1, &fakePeerTagger{})
// should prioritize peers which are fastest if len(spm.Peers()) > 0 {
if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) { t.Fatal("Expected not to have peers yet")
t.Fatal("Did not prioritize peers that received blocks")
} }
// should give first peer rating of 1 spm.AddPeer(peers[0])
if sessionPeers[0].OptimizationRating < 1.0 { if len(spm.Peers()) != 1 {
t.Fatal("Did not assign rating to best peer correctly") t.Fatal("Expected to have one peer")
} }
// should give other optimized peers ratings between 0 & 1 spm.AddPeer(peers[1])
if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) { if len(spm.Peers()) != 2 {
t.Fatal("Did not assign rating to other optimized peers correctly") t.Fatal("Expected to have two peers")
} }
// should not record a response for a broadcast return that arrived AFTER the timeout period spm.RemovePeer(peers[0])
// leaving peer unoptimized if len(spm.Peers()) != 1 {
if sessionPeers[2].OptimizationRating != 0 { t.Fatal("Expected to have one peer")
t.Fatal("should not have recorded broadcast response for peer that arrived after timeout period")
} }
}
// now we make a targeted request, which SHOULD affect peer func TestPeersDiscovered(t *testing.T) {
// rating if it times out peers := testutil.GeneratePeers(2)
c2 := testutil.GenerateCids(1) spm := New(1, &fakePeerTagger{})
// Request again
sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c2)
// wait for a timeout
time.Sleep(40 * time.Millisecond)
// call again if spm.PeersDiscovered() {
nextSessionPeers := sessionPeerManager.GetOptimizedPeers() t.Fatal("Expected not to have discovered peers yet")
if sessionPeers[1].OptimizationRating <= nextSessionPeers[1].OptimizationRating {
t.Fatal("Timeout should have affected optimization rating but did not")
} }
// now we make a targeted request, but later cancel it spm.AddPeer(peers[0])
// timing out should not affect rating if !spm.PeersDiscovered() {
c3 := testutil.GenerateCids(1) t.Fatal("Expected to have discovered peers")
}
// Request again
sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c3)
sessionPeerManager.RecordCancels([]cid.Cid{c3[0]})
// wait for a timeout
time.Sleep(40 * time.Millisecond)
// call again spm.RemovePeer(peers[0])
thirdSessionPeers := sessionPeerManager.GetOptimizedPeers() if !spm.PeersDiscovered() {
if nextSessionPeers[1].OptimizationRating != thirdSessionPeers[1].OptimizationRating { t.Fatal("Expected to still have discovered peers")
t.Fatal("Timeout should not have affected optimization rating but did")
} }
}
// if we make a targeted request that is then cancelled, but we still func TestPeerTagging(t *testing.T) {
// receive the block before the timeout, it's worth recording and affecting latency peers := testutil.GeneratePeers(2)
fpt := &fakePeerTagger{}
spm := New(1, fpt)
c4 := testutil.GenerateCids(1) spm.AddPeer(peers[0])
if len(fpt.taggedPeers) != 1 {
t.Fatal("Expected to have tagged one peer")
}
// Request again spm.AddPeer(peers[0])
sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c4) if len(fpt.taggedPeers) != 1 {
sessionPeerManager.RecordCancels([]cid.Cid{c4[0]}) t.Fatal("Expected to have tagged one peer")
time.Sleep(2 * time.Millisecond) }
sessionPeerManager.RecordPeerResponse(peer2, []cid.Cid{c4[0]})
time.Sleep(2 * time.Millisecond)
// call again spm.AddPeer(peers[1])
fourthSessionPeers := sessionPeerManager.GetOptimizedPeers() if len(fpt.taggedPeers) != 2 {
if thirdSessionPeers[1].OptimizationRating >= fourthSessionPeers[1].OptimizationRating { t.Fatal("Expected to have tagged two peers")
t.Fatal("Timeout should have affected optimization rating but did not")
} }
// ensure all peer latency tracking has been cleaned up spm.RemovePeer(peers[1])
if len(sessionPeerManager.activePeers[peer2].lt.requests) > 0 { if len(fpt.taggedPeers) != 1 {
t.Fatal("Latency request tracking should have been cleaned up but was not") t.Fatal("Expected to have untagged peer")
} }
} }
func TestUntaggingPeers(t *testing.T) { func TestShutdown(t *testing.T) {
ctx := context.Background() peers := testutil.GeneratePeers(2)
ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
defer cancel()
peers := testutil.GeneratePeers(5)
completed := make(chan struct{})
fpt := &fakePeerTagger{} fpt := &fakePeerTagger{}
fppf := &fakePeerProviderFinder{peers, completed} spm := New(1, fpt)
c := testutil.GenerateCids(1)[0]
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpt, fppf)
sessionPeerManager.FindMorePeers(ctx, c) spm.AddPeer(peers[0])
select { spm.AddPeer(peers[1])
case <-completed: if len(fpt.taggedPeers) != 2 {
case <-ctx.Done(): t.Fatal("Expected to have tagged two peers")
t.Fatal("Did not finish finding providers")
} }
time.Sleep(15 * time.Millisecond)
if fpt.count() != len(peers) { spm.Shutdown()
t.Fatal("Peers were not tagged!")
}
<-ctx.Done()
fpt.wait.Wait()
if fpt.count() != 0 { if len(fpt.taggedPeers) != 0 {
t.Fatal("Peers were not untagged!") t.Fatal("Expected to have untagged all peers")
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment