Commit 03e10a06 authored by hannahhoward's avatar hannahhoward

fix(tests): stabilize session tests

Improve stability of tests for Session and SessionPeerManager

fix #61
parent 14248b8d
......@@ -26,11 +26,17 @@ type fakeWantManager struct {
}
func (fwm *fakeWantManager) WantBlocks(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
fwm.wantReqs <- wantReq{cids, peers}
select {
case fwm.wantReqs <- wantReq{cids, peers}:
case <-ctx.Done():
}
}
func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
fwm.cancelReqs <- wantReq{cids, peers}
select {
case fwm.cancelReqs <- wantReq{cids, peers}:
case <-ctx.Done():
}
}
type fakePeerManager struct {
......@@ -39,8 +45,11 @@ type fakePeerManager struct {
findMorePeersRequested chan struct{}
}
func (fpm *fakePeerManager) FindMorePeers(context.Context, cid.Cid) {
fpm.findMorePeersRequested <- struct{}{}
func (fpm *fakePeerManager) FindMorePeers(ctx context.Context, k cid.Cid) {
select {
case fpm.findMorePeersRequested <- struct{}{}:
case <-ctx.Done():
}
}
func (fpm *fakePeerManager) GetOptimizedPeers() []peer.ID {
......@@ -105,10 +114,20 @@ func TestSessionGetBlocks(t *testing.T) {
var receivedBlocks []blocks.Block
for i, p := range peers {
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, receivedWantReq.cids[i])])
receivedBlock := <-getBlocksCh
receivedBlocks = append(receivedBlocks, receivedBlock)
cancelBlock := <-cancelReqs
newCancelReqs = append(newCancelReqs, cancelBlock)
select {
case cancelBlock := <-cancelReqs:
newCancelReqs = append(newCancelReqs, cancelBlock)
case <-ctx.Done():
t.Fatal("did not cancel block want")
}
select {
case receivedBlock := <-getBlocksCh:
receivedBlocks = append(receivedBlocks, receivedBlock)
case <-ctx.Done():
t.Fatal("Did not receive block!")
}
select {
case wantBlock := <-wantReqs:
newBlockReqs = append(newBlockReqs, wantBlock)
......@@ -169,7 +188,7 @@ func TestSessionGetBlocks(t *testing.T) {
func TestSessionFindMorePeers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond)
defer cancel()
wantReqs := make(chan wantReq, 1)
cancelReqs := make(chan wantReq, 1)
......@@ -191,26 +210,51 @@ func TestSessionFindMorePeers(t *testing.T) {
}
// clear the initial block of wants
<-wantReqs
select {
case <-wantReqs:
case <-ctx.Done():
t.Fatal("Did not make first want request ")
}
// receive a block to trigger a tick reset
time.Sleep(200 * time.Microsecond)
time.Sleep(20 * time.Millisecond) // need to make sure some latency registers
// or there will be no tick set -- time precision on Windows in go is in the
// millisecond range
p := testutil.GeneratePeers(1)[0]
session.ReceiveBlockFrom(p, blks[0])
<-getBlocksCh
<-wantReqs
<-cancelReqs
// wait for a request to get more peers to occur
<-fpm.findMorePeersRequested
select {
case <-cancelReqs:
case <-ctx.Done():
t.Fatal("Did not cancel block")
}
select {
case <-getBlocksCh:
case <-ctx.Done():
t.Fatal("Did not get block")
}
select {
case <-wantReqs:
case <-ctx.Done():
t.Fatal("Did not make second want request ")
}
// verify a broadcast was made
receivedWantReq := <-wantReqs
if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list")
select {
case receivedWantReq := <-wantReqs:
if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list")
}
if receivedWantReq.peers != nil {
t.Fatal("did not make a broadcast")
}
case <-ctx.Done():
t.Fatal("Never rebroadcast want list")
}
if receivedWantReq.peers != nil {
t.Fatal("did not make a broadcast")
// wait for a request to get more peers to occur
select {
case <-fpm.findMorePeersRequested:
case <-ctx.Done():
t.Fatal("Did not find more peers")
}
<-ctx.Done()
}
......@@ -2,6 +2,7 @@ package sessionpeermanager
import (
"context"
"errors"
"math/rand"
"sync"
"testing"
......@@ -18,27 +19,40 @@ import (
type fakePeerNetwork struct {
peers []peer.ID
connManager ifconnmgr.ConnManager
completed chan struct{}
connect chan struct{}
}
func (fpn *fakePeerNetwork) ConnectionManager() ifconnmgr.ConnManager {
return fpn.connManager
}
func (fpn *fakePeerNetwork) ConnectTo(context.Context, peer.ID) error {
return nil
func (fpn *fakePeerNetwork) ConnectTo(ctx context.Context, p peer.ID) error {
select {
case fpn.connect <- struct{}{}:
return nil
case <-ctx.Done():
return errors.New("Timeout Occurred")
}
}
func (fpn *fakePeerNetwork) FindProvidersAsync(ctx context.Context, c cid.Cid, num int) <-chan peer.ID {
peerCh := make(chan peer.ID)
go func() {
defer close(peerCh)
for _, p := range fpn.peers {
select {
case peerCh <- p:
case <-ctx.Done():
close(peerCh)
return
}
}
close(peerCh)
select {
case fpn.completed <- struct{}{}:
case <-ctx.Done():
}
}()
return peerCh
}
......@@ -55,7 +69,6 @@ func (fcm *fakeConnManager) TagPeer(p peer.ID, tag string, n int) {
func (fcm *fakeConnManager) UntagPeer(p peer.ID, tag string) {
defer fcm.wait.Done()
for i := 0; i < len(fcm.taggedPeers); i++ {
if fcm.taggedPeers[i] == p {
fcm.taggedPeers[i] = fcm.taggedPeers[len(fcm.taggedPeers)-1]
......@@ -63,7 +76,6 @@ func (fcm *fakeConnManager) UntagPeer(p peer.ID, tag string) {
return
}
}
}
func (*fakeConnManager) GetTagInfo(p peer.ID) *ifconnmgr.TagInfo { return nil }
......@@ -74,9 +86,12 @@ func TestFindingMorePeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
completed := make(chan struct{})
connect := make(chan struct{})
peers := testutil.GeneratePeers(5)
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{peers, fcm}
fpn := &fakePeerNetwork{peers, fcm, completed, connect}
c := testutil.GenerateCids(1)[0]
id := testutil.GenerateSessionID()
......@@ -85,7 +100,20 @@ func TestFindingMorePeers(t *testing.T) {
findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer findCancel()
sessionPeerManager.FindMorePeers(ctx, c)
<-findCtx.Done()
select {
case <-completed:
case <-findCtx.Done():
t.Fatal("Did not finish finding providers")
}
for range peers {
select {
case <-connect:
case <-findCtx.Done():
t.Fatal("Did not connect to peer")
}
}
time.Sleep(2 * time.Millisecond)
sessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(sessionPeers) != len(peers) {
t.Fatal("incorrect number of peers found")
......@@ -106,7 +134,7 @@ func TestRecordingReceivedBlocks(t *testing.T) {
defer cancel()
p := testutil.GeneratePeers(1)[0]
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{nil, fcm}
fpn := &fakePeerNetwork{nil, fcm, nil, nil}
c := testutil.GenerateCids(1)[0]
id := testutil.GenerateSessionID()
......@@ -127,17 +155,32 @@ func TestRecordingReceivedBlocks(t *testing.T) {
func TestOrderingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
defer cancel()
peers := testutil.GeneratePeers(100)
completed := make(chan struct{})
connect := make(chan struct{})
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{peers, fcm}
fpn := &fakePeerNetwork{peers, fcm, completed, connect}
c := testutil.GenerateCids(1)
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpn)
// add all peers to session
sessionPeerManager.FindMorePeers(ctx, c[0])
select {
case <-completed:
case <-ctx.Done():
t.Fatal("Did not finish finding providers")
}
for range peers {
select {
case <-connect:
case <-ctx.Done():
t.Fatal("Did not connect to peer")
}
}
time.Sleep(2 * time.Millisecond)
// record broadcast
sessionPeerManager.RecordPeerRequests(nil, c)
......@@ -193,15 +236,30 @@ func TestUntaggingPeers(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
peers := testutil.GeneratePeers(5)
completed := make(chan struct{})
connect := make(chan struct{})
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{peers, fcm}
fpn := &fakePeerNetwork{peers, fcm, completed, connect}
c := testutil.GenerateCids(1)[0]
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpn)
sessionPeerManager.FindMorePeers(ctx, c)
time.Sleep(5 * time.Millisecond)
select {
case <-completed:
case <-ctx.Done():
t.Fatal("Did not finish finding providers")
}
for range peers {
select {
case <-connect:
case <-ctx.Done():
t.Fatal("Did not connect to peer")
}
}
time.Sleep(2 * time.Millisecond)
if len(fcm.taggedPeers) != len(peers) {
t.Fatal("Peers were not tagged!")
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment