Unverified Commit 9be7169f authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #628 from libp2p/test/fix-hung-test

fix all flaky tests
parents a7fb7945 297911ca
......@@ -32,7 +32,6 @@ import (
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
"github.com/libp2p/go-libp2p-testing/ci"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"
......@@ -129,7 +128,7 @@ func setupDHT(ctx context.Context, t *testing.T, client bool, options ...Option)
return d
}
func setupDHTS(t *testing.T, ctx context.Context, n int) []*IpfsDHT {
func setupDHTS(t *testing.T, ctx context.Context, n int, options ...Option) []*IpfsDHT {
addrs := make([]ma.Multiaddr, n)
dhts := make([]*IpfsDHT, n)
peers := make([]peer.ID, n)
......@@ -138,7 +137,7 @@ func setupDHTS(t *testing.T, ctx context.Context, n int) []*IpfsDHT {
sanityPeersMap := make(map[string]struct{})
for i := 0; i < n; i++ {
dhts[i] = setupDHT(ctx, t, false)
dhts[i] = setupDHT(ctx, t, false, options...)
peers[i] = dhts[i].PeerID()
addrs[i] = dhts[i].host.Addrs()[0]
......@@ -673,8 +672,9 @@ func TestLocalProvides(t *testing.T) {
}
// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) {
// test "well-formed-ness" (>= minPeers peers in every routing table)
t.Helper()
checkTables := func() bool {
totalPeers := 0
......@@ -699,11 +699,12 @@ func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers i
for {
select {
case <-timeoutA:
logger.Debugf("did not reach well-formed routing tables by %s", timeout)
return false // failed
t.Errorf("failed to reach well-formed routing tables after %s", timeout)
return
case <-time.After(5 * time.Millisecond):
if checkTables() {
return true // succeeded
// succeeded
return
}
}
}
......@@ -760,6 +761,7 @@ func TestRefresh(t *testing.T) {
}
waitForWellFormedTables(t, dhts, 7, 10, 10*time.Second)
cancelT()
if u.Debug {
......@@ -830,12 +832,12 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) {
}
func TestPeriodicRefresh(t *testing.T) {
if ci.IsRunning() {
t.Skip("skipping on CI. highly timing dependent")
}
if testing.Short() {
t.SkipNow()
}
if detectrace.WithRace() {
t.Skip("skipping due to race detector max goroutines")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
......@@ -894,7 +896,9 @@ func TestPeriodicRefresh(t *testing.T) {
}
func TestProvidesMany(t *testing.T) {
t.Skip("this test doesn't work")
if detectrace.WithRace() {
t.Skip("skipping due to race detector max goroutines")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
......@@ -1149,9 +1153,6 @@ func TestConnectCollision(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
if ci.IsRunning() {
t.Skip("Skipping on CI.")
}
runTimes := 10
......@@ -1337,7 +1338,7 @@ func minInt(a, b int) int {
}
func TestFindPeerQueryMinimal(t *testing.T) {
testFindPeerQuery(t, 2, 22, 11)
testFindPeerQuery(t, 2, 22, 1)
}
func TestFindPeerQuery(t *testing.T) {
......@@ -1348,22 +1349,19 @@ func TestFindPeerQuery(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
if curFileLimit() < 1024 {
t.Skip("insufficient file descriptors available")
}
testFindPeerQuery(t, 20, 80, 16)
testFindPeerQuery(t, 5, 40, 3)
}
// NOTE: You must have ATLEAST (minRTRefreshThreshold+1) test peers before using this.
func testFindPeerQuery(t *testing.T,
bootstrappers, // Number of nodes connected to the querying node
leafs, // Number of nodes that might be connected to from the bootstrappers
bootstrapperLeafConns int, // Number of connections each bootstrapper has to the leaf nodes
bootstrapConns int, // Number of bootstrappers each leaf should connect to.
) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dhts := setupDHTS(t, ctx, 1+bootstrappers+leafs)
dhts := setupDHTS(t, ctx, 1+bootstrappers+leafs, BucketSize(4))
defer func() {
for _, d := range dhts {
d.Close()
......@@ -1371,13 +1369,14 @@ func testFindPeerQuery(t *testing.T,
}
}()
t.Log("connecting")
mrand := rand.New(rand.NewSource(42))
guy := dhts[0]
others := dhts[1:]
for i := 0; i < bootstrappers; i++ {
for j := 0; j < bootstrapperLeafConns; j++ {
v := mrand.Intn(leafs)
connectNoSync(t, ctx, others[i], others[bootstrappers+v])
for i := 0; i < leafs; i++ {
for _, v := range mrand.Perm(bootstrappers)[:bootstrapConns] {
connectNoSync(t, ctx, others[v], others[bootstrappers+i])
}
}
......@@ -1385,25 +1384,35 @@ func testFindPeerQuery(t *testing.T,
connectNoSync(t, ctx, guy, others[i])
}
t.Log("waiting for routing tables")
// give some time for things to settle down
waitForWellFormedTables(t, dhts, minRTRefreshThreshold, minRTRefreshThreshold, 5*time.Second)
waitForWellFormedTables(t, dhts, bootstrapConns, bootstrapConns, 5*time.Second)
for _, d := range dhts {
if len(d.RoutingTable().ListPeers()) > 0 {
if err := <-d.RefreshRoutingTable(); err != nil {
t.Fatal(err)
}
}
t.Log("refreshing")
var wg sync.WaitGroup
for _, dht := range dhts {
wg.Add(1)
go func(d *IpfsDHT) {
<-d.RefreshRoutingTable()
wg.Done()
}(dht)
}
var reachableIds []peer.ID
for i, d := range dhts {
lp := len(d.host.Network().Peers())
if i != 0 && lp > 0 {
reachableIds = append(reachableIds, d.PeerID())
}
wg.Wait()
t.Log("waiting for routing tables again")
// Wait for refresh to work. At least one bucket should be full.
waitForWellFormedTables(t, dhts, 4, 0, 5*time.Second)
var peers []peer.ID
for _, d := range others {
peers = append(peers, d.PeerID())
}
t.Logf("%d reachable ids", len(reachableIds))
t.Log("querying")
val := "foobar"
rtval := kb.ConvertKey(val)
......@@ -1418,7 +1427,7 @@ func testFindPeerQuery(t *testing.T,
sort.Sort(peer.IDSlice(outpeers))
exp := kb.SortClosestPeers(reachableIds, rtval)[:minInt(guy.bucketSize, len(reachableIds))]
exp := kb.SortClosestPeers(peers, rtval)[:minInt(guy.bucketSize, len(peers))]
t.Logf("got %d peers", len(outpeers))
got := kb.SortClosestPeers(outpeers, rtval)
......@@ -1588,19 +1597,19 @@ func TestHandleRemotePeerProtocolChanges(t *testing.T) {
connect(t, ctx, dhtA, dhtB)
// now assert both have each other in their RT
require.True(t, waitForWellFormedTables(t, []*IpfsDHT{dhtA, dhtB}, 1, 1, 10*time.Second), "both RT should have one peer each")
waitForWellFormedTables(t, []*IpfsDHT{dhtA, dhtB}, 1, 1, 10*time.Second)
// dhtB becomes a client
require.NoError(t, dhtB.setMode(modeClient))
// which means that dhtA should evict it from it's RT
require.True(t, waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 0, 0, 10*time.Second), "dHTA routing table should have 0 peers")
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 0, 0, 10*time.Second)
// dhtB becomes a server
require.NoError(t, dhtB.setMode(modeServer))
// which means dhtA should have it in the RT again because of fixLowPeers
require.True(t, waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 1, 1, 10*time.Second), "dHTA routing table should have 1 peers")
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 1, 1, 10*time.Second)
}
func TestGetSetPluggedProtocol(t *testing.T) {
......
......@@ -20,20 +20,21 @@ import (
ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
"github.com/stretchr/testify/require"
)
// Test that one hung request to a peer doesn't prevent another request
// using that same peer from obeying its context.
func TestHungRequest(t *testing.T) {
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mn, err := mocknet.FullMeshLinked(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()
os := []Option{testPrefix, DisableAutoRefresh()}
os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
......@@ -46,8 +47,18 @@ func TestHungRequest(t *testing.T) {
})
}
require.NoError(t, hosts[0].Peerstore().AddProtocols(hosts[1].ID(), protocol.ConvertToStrings(d.serverProtocols)...))
d.peerFound(ctx, hosts[1].ID(), true)
err = mn.ConnectAllButSelf()
if err != nil {
t.Fatal("failed to connect peers", err)
}
// Wait at a bit for a peer in our routing table.
for i := 0; i < 100 && d.routingTable.Size() == 0; i++ {
time.Sleep(10 * time.Millisecond)
}
if d.routingTable.Size() == 0 {
t.Fatal("failed to fill routing table")
}
ctx1, cancel1 := context.WithTimeout(ctx, 1*time.Second)
defer cancel1()
......@@ -66,8 +77,8 @@ func TestHungRequest(t *testing.T) {
t.Errorf("expected to fail with deadline exceeded, got: %s", ctx2.Err())
}
select {
case <-done:
t.Errorf("GetClosestPeers should not have returned yet")
case err = <-done:
t.Error("GetClosestPeers should not have returned yet", err)
default:
err = <-done
if err != context.DeadlineExceeded {
......@@ -75,6 +86,10 @@ func TestHungRequest(t *testing.T) {
}
}
if d.routingTable.Size() == 0 {
// make sure we didn't just disconnect
t.Fatal("expected peers in the routing table")
}
}
func TestGetFailures(t *testing.T) {
......@@ -202,6 +217,11 @@ func TestGetFailures(t *testing.T) {
t.Fatal("shouldnt have provider peers")
}
}
if d.routingTable.Size() == 0 {
// make sure we didn't just disconnect
t.Fatal("expected peers in the routing table")
}
}
func TestNotFound(t *testing.T) {
......@@ -217,16 +237,12 @@ func TestNotFound(t *testing.T) {
}
hosts := mn.Hosts()
os := []Option{testPrefix, DisableAutoRefresh()}
os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
for _, p := range hosts {
d.peerFound(ctx, p.ID(), true)
}
// Reply with random peers to every message
for _, host := range hosts {
host := host // shadow loop var
......@@ -239,7 +255,8 @@ func TestNotFound(t *testing.T) {
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
// this isn't an error, it just means the stream has died.
return
}
switch pmes.GetType() {
......@@ -255,13 +272,23 @@ func TestNotFound(t *testing.T) {
resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
return
}
default:
panic("Shouldnt recieve this.")
}
})
}
for _, peer := range hosts {
if host == peer {
continue
}
_ = peer.Peerstore().AddProtocols(host.ID(), protocol.ConvertToStrings(d.serverProtocols)...)
}
}
for _, p := range hosts {
d.peerFound(ctx, p.ID(), true)
}
// long timeout to ensure timing is not at play.
......@@ -275,6 +302,10 @@ func TestNotFound(t *testing.T) {
}
switch err {
case routing.ErrNotFound:
if d.routingTable.Size() == 0 {
// make sure we didn't just disconnect
t.Fatal("expected peers in the routing table")
}
//Success!
return
case u.ErrTimeout:
......@@ -299,7 +330,7 @@ func TestLessThanKResponses(t *testing.T) {
}
hosts := mn.Hosts()
os := []Option{testPrefix, DisableAutoRefresh()}
os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
......@@ -371,7 +402,7 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
os := []Option{testPrefix, DisableAutoRefresh()}
os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
......
// +build !windows,!wasm
package dht
import (
"fmt"
"os"
"testing"
"syscall"
)
func TestMain(m *testing.M) {
err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &syscall.Rlimit{
Cur: 4096,
Max: 4096,
})
if err != nil {
fmt.Println("failed to increase open file descriptor limit, can't run tests")
os.Exit(1)
}
os.Exit(m.Run())
}
// +build !windows,!wasm
package dht
import "syscall"
func curFileLimit() uint64 {
var n syscall.Rlimit
_ = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &n) // used for testing, ignore error.
// cast because some platforms use int64 (e.g., freebsd)
return uint64(n.Cur)
}
package dht
func curFileLimit() uint64 {
return 16 * 1024 * 1024
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment