Commit ab9b7363 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #4169 from ipfs/fix/4062

add blocks to the blockstore before returning them from blockservice sessions.
parents 9c0f9362 b22d3fbf
...@@ -298,6 +298,14 @@ func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) { ...@@ -298,6 +298,14 @@ func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) {
// HasBlock announces the existance of a block to this bitswap service. The // HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers. // service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error { func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlockFrom(blk, "")
}
// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
select { select {
case <-bs.process.Closing(): case <-bs.process.Closing():
return errors.New("bitswap is closed") return errors.New("bitswap is closed")
...@@ -317,8 +325,11 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error { ...@@ -317,8 +325,11 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
// it now as it requires more thought and isnt causing immediate problems. // it now as it requires more thought and isnt causing immediate problems.
bs.notifications.Publish(blk) bs.notifications.Publish(blk)
for _, s := range bs.SessionsForBlock(blk.Cid()) { k := blk.Cid()
s.receiveBlockFrom("", blk) ks := []*cid.Cid{k}
for _, s := range bs.SessionsForBlock(k) {
s.receiveBlockFrom(from, blk)
bs.CancelWants(ks, s.id)
} }
bs.engine.AddBlock(blk) bs.engine.AddBlock(blk)
...@@ -379,21 +390,12 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -379,21 +390,12 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
bs.updateReceiveCounters(b) bs.updateReceiveCounters(b)
k := b.Cid()
log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
for _, ses := range bs.SessionsForBlock(k) {
ses.receiveBlockFrom(p, b)
bs.CancelWants([]*cid.Cid{k}, ses.id)
}
log.Debugf("got block %s from %s", b, p) log.Debugf("got block %s from %s", b, p)
// TODO: rework this to not call 'HasBlock'. 'HasBlock' is really
// designed to be called when blocks are coming in from non-bitswap if err := bs.receiveBlockFrom(b, p); err != nil {
// places (like the user manually adding data) log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
if err := bs.HasBlock(b); err != nil {
log.Warningf("ReceiveMessage HasBlock error: %s", err)
} }
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
}(block) }(block)
} }
wg.Wait() wg.Wait()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment