From 6ab4bfea95464a3e995425a716213ed342dbeac5 Mon Sep 17 00:00:00 2001 From: Jeromy <jeromyj@gmail.com> Date: Sat, 16 May 2015 17:46:26 -0700 Subject: [PATCH] turn tests down a bit and better context passing --- exchange/bitswap/bitswap.go | 4 +-- exchange/bitswap/bitswap_test.go | 4 +-- .../{peermanager.go => wantmanager.go} | 26 ++++++++++++------- 3 files changed, 20 insertions(+), 14 deletions(-) rename exchange/bitswap/{peermanager.go => wantmanager.go} (89%) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 6a1e58ff4..c6f3c74a9 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -86,9 +86,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), provideKeys: make(chan u.Key), - wm: NewWantManager(network), + wm: NewWantManager(ctx, network), } - go bs.wm.Run(ctx) + go bs.wm.Run() network.SetDelegate(bs) // Start up bitswaps async worker routines diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index fa5b3b97d..86eb2d764 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -92,7 +92,7 @@ func TestLargeSwarm(t *testing.T) { if testing.Short() { t.SkipNow() } - numInstances := 500 + numInstances := 100 numBlocks := 2 if detectrace.WithRace() { // when running with the race detector, 500 instances launches @@ -124,7 +124,6 @@ func TestLargeFileTwoPeers(t *testing.T) { if testing.Short() { t.SkipNow() } - t.Parallel() numInstances := 2 numBlocks := 100 PerformDistributionTest(t, numInstances, numBlocks) @@ -164,6 +163,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { } for _ = range outch { } + log.Error("DONE") }(inst) } wg.Wait() diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/wantmanager.go similarity index 89% rename from exchange/bitswap/peermanager.go rename to exchange/bitswap/wantmanager.go index 8ec89c8e3..3b2067914 100644 --- a/exchange/bitswap/peermanager.go +++ b/exchange/bitswap/wantmanager.go @@ -28,9 +28,11 @@ type WantManager struct { wl *wantlist.Wantlist network bsnet.BitSwapNetwork + + ctx context.Context } -func NewWantManager(network bsnet.BitSwapNetwork) *WantManager { +func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { return &WantManager{ incoming: make(chan []*bsmsg.Entry, 10), connect: make(chan peer.ID, 10), @@ -38,6 +40,7 @@ func NewWantManager(network bsnet.BitSwapNetwork) *WantManager { peers: make(map[peer.ID]*msgQueue), wl: wantlist.New(), network: network, + ctx: ctx, } } @@ -80,7 +83,10 @@ func (pm *WantManager) addEntries(ks []u.Key, cancel bool) { }, }) } - pm.incoming <- entries + select { + case pm.incoming <- entries: + case <-pm.ctx.Done(): + } } func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { @@ -97,7 +103,7 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { } } -func (pm *WantManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueue { +func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { _, ok := pm.peers[p] if ok { // TODO: log an error? @@ -116,7 +122,7 @@ func (pm *WantManager) startPeerHandler(ctx context.Context, p peer.ID) *msgQueu mq.work <- struct{}{} pm.peers[p] = mq - go pm.runQueue(ctx, mq) + go pm.runQueue(mq) return mq } @@ -131,12 +137,12 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) { delete(pm.peers, p) } -func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) { +func (pm *WantManager) runQueue(mq *msgQueue) { for { select { case <-mq.work: // there is work to be done - err := pm.network.ConnectTo(ctx, mq.p) + err := pm.network.ConnectTo(pm.ctx, mq.p) if err != nil { log.Error(err) // TODO: cant connect, what now? @@ -153,7 +159,7 @@ func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) { mq.outlk.Unlock() // send wantlist updates - err = pm.network.SendMessage(ctx, mq.p, wlm) + err = pm.network.SendMessage(pm.ctx, mq.p, wlm) if err != nil { log.Error("bitswap send error: ", err) // TODO: what do we do if this fails? @@ -173,7 +179,7 @@ func (pm *WantManager) Disconnected(p peer.ID) { } // TODO: use goprocess here once i trust it -func (pm *WantManager) Run(ctx context.Context) { +func (pm *WantManager) Run() { for { select { case entries := <-pm.incoming: @@ -193,10 +199,10 @@ func (pm *WantManager) Run(ctx context.Context) { } case p := <-pm.connect: - pm.startPeerHandler(ctx, p) + pm.startPeerHandler(p) case p := <-pm.disconnect: pm.stopPeerHandler(p) - case <-ctx.Done(): + case <-pm.ctx.Done(): return } } -- GitLab