From ab161cf6b4f2cee1c3b63c943b640b120f9fca93 Mon Sep 17 00:00:00 2001 From: Jeromy <jeromyj@gmail.com> Date: Tue, 26 May 2015 11:14:44 -0700 Subject: [PATCH] clean up organization of receivemessage and fix race --- exchange/bitswap/bitswap.go | 25 +++++++++++++++++++------ exchange/bitswap/wantmanager.go | 4 ++-- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 58243e888..d103687d2 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -270,26 +270,40 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg // TODO: this is bad, and could be easily abused. // Should only track *useful* messages in ledger - if len(incoming.Blocks()) == 0 { + iblocks := incoming.Blocks() + + if len(iblocks) == 0 { return } // quickly send out cancels, reduces chances of duplicate block receives var keys []u.Key - for _, block := range incoming.Blocks() { + for _, block := range iblocks { keys = append(keys, block.Key()) } bs.wm.CancelWants(keys) - for _, block := range incoming.Blocks() { + for _, block := range iblocks { bs.counterLk.Lock() bs.blocksRecvd++ - if has, err := bs.blockstore.Has(block.Key()); err == nil && has { + has, err := bs.blockstore.Has(block.Key()) + if err == nil && has { bs.dupBlocksRecvd++ } brecvd := bs.blocksRecvd bdup := bs.dupBlocksRecvd bs.counterLk.Unlock() + if has { + continue + } + + // put this after the duplicate check as a block not on our wantlist may + // have already been received. + if _, found := bs.wm.wl.Contains(block.Key()); !found { + log.Notice("received un-asked-for block: %s", block) + continue + } + log.Infof("got block %s from %s (%d,%d)", block, p, brecvd, bdup) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) @@ -302,7 +316,6 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerConnected(p peer.ID) { - // TODO: add to clientWorker?? bs.wm.Connected(p) } @@ -313,7 +326,7 @@ func (bs *Bitswap) PeerDisconnected(p peer.ID) { } func (bs *Bitswap) ReceiveError(err error) { - log.Debugf("Bitswap ReceiveError: %s", err) + log.Infof("Bitswap ReceiveError: %s", err) // TODO log the network error // TODO bubble the network error up to the parent context/error logger } diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 5405f5074..e87453920 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -21,7 +21,7 @@ type WantManager struct { // synchronized by Run loop, only touch inside there peers map[peer.ID]*msgQueue - wl *wantlist.Wantlist + wl *wantlist.ThreadSafe network bsnet.BitSwapNetwork ctx context.Context @@ -33,7 +33,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana connect: make(chan peer.ID, 10), disconnect: make(chan peer.ID, 10), peers: make(map[peer.ID]*msgQueue), - wl: wantlist.New(), + wl: wantlist.NewThreadSafe(), network: network, ctx: ctx, } -- GitLab