Commit 400a1aec authored by Jeromy Johnson's avatar Jeromy Johnson

Merge pull request #2316 from ipfs/fix/random-bstest-hangs

wait for peers in wantmanager to all appear
parents eaa4ff0c c73da848
...@@ -158,6 +158,19 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { ...@@ -158,6 +158,19 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.Log("Give the blocks to the first instance") t.Log("Give the blocks to the first instance")
nump := len(instances) - 1
// assert we're properly connected
for _, inst := range instances {
peers := inst.Exchange.wm.ConnectedPeers()
for i := 0; i < 10 && len(peers) != nump; i++ {
time.Sleep(time.Millisecond * 50)
peers = inst.Exchange.wm.ConnectedPeers()
}
if len(peers) != nump {
t.Fatal("not enough peers connected to instance")
}
}
var blkeys []key.Key var blkeys []key.Key
first := instances[0] first := instances[0]
for _, b := range blocks { for _, b := range blocks {
......
...@@ -16,8 +16,9 @@ import ( ...@@ -16,8 +16,9 @@ import (
type WantManager struct { type WantManager struct {
// sync channels for Run loop // sync channels for Run loop
incoming chan []*bsmsg.Entry incoming chan []*bsmsg.Entry
connect chan peer.ID // notification channel for new peers connecting connect chan peer.ID // notification channel for new peers connecting
disconnect chan peer.ID // notification channel for peers disconnecting disconnect chan peer.ID // notification channel for peers disconnecting
peerReqs chan chan []peer.ID // channel to request connected peers on
// synchronized by Run loop, only touch inside there // synchronized by Run loop, only touch inside there
peers map[peer.ID]*msgQueue peers map[peer.ID]*msgQueue
...@@ -32,6 +33,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana ...@@ -32,6 +33,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana
incoming: make(chan []*bsmsg.Entry, 10), incoming: make(chan []*bsmsg.Entry, 10),
connect: make(chan peer.ID, 10), connect: make(chan peer.ID, 10),
disconnect: make(chan peer.ID, 10), disconnect: make(chan peer.ID, 10),
peerReqs: make(chan chan []peer.ID),
peers: make(map[peer.ID]*msgQueue), peers: make(map[peer.ID]*msgQueue),
wl: wantlist.NewThreadSafe(), wl: wantlist.NewThreadSafe(),
network: network, network: network,
...@@ -88,6 +90,12 @@ func (pm *WantManager) addEntries(ks []key.Key, cancel bool) { ...@@ -88,6 +90,12 @@ func (pm *WantManager) addEntries(ks []key.Key, cancel bool) {
} }
} }
func (pm *WantManager) ConnectedPeers() []peer.ID {
resp := make(chan []peer.ID)
pm.peerReqs <- resp
return <-resp
}
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure // Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack // throughout the network stack
...@@ -242,6 +250,12 @@ func (pm *WantManager) Run() { ...@@ -242,6 +250,12 @@ func (pm *WantManager) Run() {
pm.startPeerHandler(p) pm.startPeerHandler(p)
case p := <-pm.disconnect: case p := <-pm.disconnect:
pm.stopPeerHandler(p) pm.stopPeerHandler(p)
case req := <-pm.peerReqs:
var peers []peer.ID
for p := range pm.peers {
peers = append(peers, p)
}
req <- peers
case <-pm.ctx.Done(): case <-pm.ctx.Done():
return return
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment