Commit e6b35e97 authored by Dirk McCormick's avatar Dirk McCormick

fix: don't ignore received blocks for pending wants

parent cbb8d35c
......@@ -273,14 +273,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlocksFrom("", []blocks.Block{blk})
return bs.receiveBlocksFrom(nil, "", []blocks.Block{blk})
}
// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
......@@ -294,7 +294,7 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
// Split blocks into wanted blocks vs duplicates
wanted = make([]blocks.Block, 0, len(blks))
for _, b := range blks {
if bs.wm.IsWanted(b.Cid()) {
if bs.sm.InterestedIn(b.Cid()) {
wanted = append(wanted, b)
} else {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
......@@ -354,6 +354,12 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
}
}
if from != "" {
for _, b := range wanted {
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
}
}
return nil
}
......@@ -382,17 +388,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
}
// Process blocks
err := bs.receiveBlocksFrom(p, iblocks)
err := bs.receiveBlocksFrom(ctx, p, iblocks)
if err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
return
}
for _, b := range iblocks {
if bs.wm.IsWanted(b.Cid()) {
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
}
}
}
func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) {
......
......@@ -21,6 +21,7 @@ import (
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
peer "github.com/libp2p/go-libp2p-core/peer"
p2ptestutil "github.com/libp2p/go-libp2p-netutil"
travis "github.com/libp2p/go-libp2p-testing/ci/travis"
tu "github.com/libp2p/go-libp2p-testing/etc"
......@@ -138,6 +139,8 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
}
}
// Tests that a received block is not stored in the blockstore if the block was
// not requested by the client
func TestUnwantedBlockNotAdded(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
......@@ -170,6 +173,68 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
}
}
// Tests that a received block is returned to the client and stored in the
// blockstore in the following scenario:
// - the want for the block has been requested by the client
// - the want for the block has not yet been sent out to a peer
// (because the live request queue is full)
func TestPendingBlockAdded(t *testing.T) {
ctx := context.Background()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
bg := blocksutil.NewBlockGenerator()
sessionBroadcastWantCapacity := 4
ig := testinstance.NewTestInstanceGenerator(net)
defer ig.Close()
instance := ig.Instances(1)[0]
defer instance.Exchange.Close()
oneSecCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Request enough blocks to exceed the session's broadcast want list
// capacity (by one block). The session will put the remaining block
// into the "tofetch" queue
blks := bg.Blocks(sessionBroadcastWantCapacity + 1)
ks := make([]cid.Cid, 0, len(blks))
for _, b := range blks {
ks = append(ks, b.Cid())
}
outch, err := instance.Exchange.GetBlocks(ctx, ks)
if err != nil {
t.Fatal(err)
}
// Wait a little while to make sure the session has time to process the wants
time.Sleep(time.Millisecond * 20)
// Simulate receiving a message which contains the block in the "tofetch" queue
lastBlock := blks[len(blks)-1]
bsMessage := message.New(true)
bsMessage.AddBlock(lastBlock)
unknownPeer := peer.ID("QmUHfvCQrzyR6vFXmeyCptfCWedfcmfa12V6UuziDtrw23")
instance.Exchange.ReceiveMessage(oneSecCtx, unknownPeer, bsMessage)
// Make sure Bitswap adds the block to the output channel
blkrecvd, ok := <-outch
if !ok {
t.Fatal("timed out waiting for block")
}
if !blkrecvd.Cid().Equals(lastBlock.Cid()) {
t.Fatal("received wrong block")
}
// Make sure Bitswap adds the block to the blockstore
blockInStore, err := instance.Blockstore().Has(lastBlock.Cid())
if err != nil {
t.Fatal(err)
}
if !blockInStore {
t.Fatal("Block was not added to block store")
}
}
func TestLargeSwarm(t *testing.T) {
if testing.Short() {
t.SkipNow()
......
......@@ -131,3 +131,17 @@ func (sm *SessionManager) ReceiveFrom(from peer.ID, ks []cid.Cid) {
s.session.ReceiveFrom(from, sessKs)
}
}
// InterestedIn indicates whether any of the sessions are waiting to receive
// the block with the given CID.
func (sm *SessionManager) InterestedIn(cid cid.Cid) bool {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
for _, s := range sm.sessions {
if s.session.InterestedIn(cid) {
return true
}
}
return false
}
......@@ -176,6 +176,33 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
}
}
func TestInterestedIn(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
blks := testutil.GenerateBlocksOfSize(4, 1024)
var cids []cid.Cid
for _, b := range blks {
cids = append(cids, b.Cid())
}
nextInterestedIn = []cid.Cid{cids[0], cids[1]}
_ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
nextInterestedIn = []cid.Cid{cids[0], cids[2]}
_ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
if !sm.InterestedIn(cids[0]) ||
!sm.InterestedIn(cids[1]) ||
!sm.InterestedIn(cids[2]) {
t.Fatal("expected interest but session manager was not interested")
}
if sm.InterestedIn(cids[3]) {
t.Fatal("expected no interest but session manager was interested")
}
}
func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
......
......@@ -80,22 +80,6 @@ func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []pe
wm.addEntries(context.Background(), ks, peers, true, ses)
}
// IsWanted returns whether a CID is currently wanted.
func (wm *WantManager) IsWanted(c cid.Cid) bool {
resp := make(chan bool, 1)
select {
case wm.wantMessages <- &isWantedMessage{c, resp}:
case <-wm.ctx.Done():
return false
}
select {
case wanted := <-resp:
return wanted
case <-wm.ctx.Done():
return false
}
}
// CurrentWants returns the list of current wants.
func (wm *WantManager) CurrentWants() []wantlist.Entry {
resp := make(chan []wantlist.Entry, 1)
......@@ -232,16 +216,6 @@ func (ws *wantSet) handle(wm *WantManager) {
wm.peerHandler.SendMessage(ws.entries, ws.targets, ws.from)
}
type isWantedMessage struct {
c cid.Cid
resp chan<- bool
}
func (iwm *isWantedMessage) handle(wm *WantManager) {
_, isWanted := wm.wl.Contains(iwm.c)
iwm.resp <- isWanted
}
type currentWantsMessage struct {
resp chan<- []wantlist.Entry
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment