Commit 2045a7b3 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

turn tests down a bit and better context passing

parent 31198d43
......@@ -86,9 +86,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan u.Key),
wm: NewWantManager(network),
wm: NewWantManager(ctx, network),
}
go bs.wm.Run(ctx)
go bs.wm.Run()
network.SetDelegate(bs)
// Start up bitswaps async worker routines
......
......@@ -92,7 +92,7 @@ func TestLargeSwarm(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
numInstances := 500
numInstances := 100
numBlocks := 2
if detectrace.WithRace() {
// when running with the race detector, 500 instances launches
......@@ -124,7 +124,6 @@ func TestLargeFileTwoPeers(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
numInstances := 2
numBlocks := 100
PerformDistributionTest(t, numInstances, numBlocks)
......@@ -164,6 +163,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
}
for _ = range outch {
}
log.Error("DONE")
}(inst)
}
wg.Wait()
......
......@@ -28,9 +28,11 @@ type WantManager struct {
wl *wantlist.Wantlist
network bsnet.BitSwapNetwork
ctx context.Context
}
func NewWantManager(network bsnet.BitSwapNetwork) *WantManager {
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
return &WantManager{
incoming: make(chan []*bsmsg.Entry, 10),
connect: make(chan peer.ID, 10),
......@@ -38,6 +40,7 @@ func NewWantManager(network bsnet.BitSwapNetwork) *WantManager {
peers: make(map[peer.ID]*msgQueue),
wl: wantlist.New(),
network: network,
ctx: ctx,
}
}
......@@ -80,7 +83,10 @@ func (pm *WantManager) addEntries(ks []u.Key, cancel bool) {
},
})
}
pm.incoming <- entries
select {
case pm.incoming <- entries:
case <-pm.ctx.Done():
}
}
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
......@@ -97,7 +103,7 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
}
}
func (pm *WantManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue {
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
_, ok := pm.peers[p]
if ok {
// TODO: log an error?
......@@ -116,7 +122,7 @@ func (pm *WantManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueu
mq.work <- struct{}{}
pm.peers[p] = mq
go pm.runQueue(ctx, mq)
go pm.runQueue(mq)
return mq
}
......@@ -131,12 +137,12 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) {
delete(pm.peers, p)
}
func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) {
func (pm *WantManager) runQueue(mq *msgQueue) {
for {
select {
case <-mq.work: // there is work to be done
err := pm.network.ConnectTo(ctx, mq.p)
err := pm.network.ConnectTo(pm.ctx, mq.p)
if err != nil {
log.Error(err)
// TODO: cant connect, what now?
......@@ -153,7 +159,7 @@ func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) {
mq.outlk.Unlock()
// send wantlist updates
err = pm.network.SendMessage(ctx, mq.p, wlm)
err = pm.network.SendMessage(pm.ctx, mq.p, wlm)
if err != nil {
log.Error("bitswap send error: ", err)
// TODO: what do we do if this fails?
......@@ -173,7 +179,7 @@ func (pm *WantManager) Disconnected(p peer.ID) {
}
// TODO: use goprocess here once i trust it
func (pm *WantManager) Run(ctx context.Context) {
func (pm *WantManager) Run() {
for {
select {
case entries := <-pm.incoming:
......@@ -193,10 +199,10 @@ func (pm *WantManager) Run(ctx context.Context) {
}
case p := <-pm.connect:
pm.startPeerHandler(ctx, p)
pm.startPeerHandler(p)
case p := <-pm.disconnect:
pm.stopPeerHandler(p)
case <-ctx.Done():
case <-pm.ctx.Done():
return
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment