From 11de3643840752708d2473400954fa224150ea35 Mon Sep 17 00:00:00 2001 From: Jeromy <jeromyj@gmail.com> Date: Wed, 27 May 2015 19:03:39 -0700 Subject: [PATCH] parallelize block processing --- exchange/bitswap/bitswap.go | 54 +++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index d103687d2..7e8a0f7af 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -279,39 +279,41 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg // quickly send out cancels, reduces chances of duplicate block receives var keys []u.Key for _, block := range iblocks { - keys = append(keys, block.Key()) - } - bs.wm.CancelWants(keys) - - for _, block := range iblocks { - bs.counterLk.Lock() - bs.blocksRecvd++ - has, err := bs.blockstore.Has(block.Key()) - if err == nil && has { - bs.dupBlocksRecvd++ - } - brecvd := bs.blocksRecvd - bdup := bs.dupBlocksRecvd - bs.counterLk.Unlock() - if has { - continue - } - - // put this after the duplicate check as a block not on our wantlist may - // have already been received. if _, found := bs.wm.wl.Contains(block.Key()); !found { log.Notice("received un-asked-for block: %s", block) continue } + keys = append(keys, block.Key()) + } + bs.wm.CancelWants(keys) - log.Infof("got block %s from %s (%d,%d)", block, p, brecvd, bdup) + wg := sync.WaitGroup{} + for _, block := range iblocks { + wg.Add(1) + go func(b *blocks.Block) { + defer wg.Done() + bs.counterLk.Lock() + bs.blocksRecvd++ + has, err := bs.blockstore.Has(b.Key()) + if err == nil && has { + bs.dupBlocksRecvd++ + } + brecvd := bs.blocksRecvd + bdup := bs.dupBlocksRecvd + bs.counterLk.Unlock() + if has { + return + } - hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) - if err := bs.HasBlock(hasBlockCtx, block); err != nil { - log.Warningf("ReceiveMessage HasBlock error: %s", err) - } - cancel() + log.Debugf("got block %s from %s (%d,%d)", b, p, brecvd, bdup) + hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) + if err := bs.HasBlock(hasBlockCtx, b); err != nil { + log.Warningf("ReceiveMessage HasBlock error: %s", err) + } + cancel() + }(block) } + wg.Wait() } // Connected/Disconnected warns bitswap about peer connections -- GitLab