Unverified Commit 418d88c9 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #276 from ipfs/fix/prune-dont-have

Be less aggressive when pruning peers from session
parents 159748c3 568a984c
...@@ -130,7 +130,7 @@ var mixedBenches = []mixedBench{ ...@@ -130,7 +130,7 @@ var mixedBenches = []mixedBench{
mixedBench{bench{"3Nodes-Overlap3-OneAtATime", 3, 10, overlap2, oneAtATime}, 1, 2}, mixedBench{bench{"3Nodes-Overlap3-OneAtATime", 3, 10, overlap2, oneAtATime}, 1, 2},
mixedBench{bench{"3Nodes-AllToAll-OneAtATime", 3, 10, allToAll, oneAtATime}, 1, 2}, mixedBench{bench{"3Nodes-AllToAll-OneAtATime", 3, 10, allToAll, oneAtATime}, 1, 2},
mixedBench{bench{"3Nodes-Overlap3-AllConcurrent", 3, 10, overlap2, fetchAllConcurrent}, 1, 2}, mixedBench{bench{"3Nodes-Overlap3-AllConcurrent", 3, 10, overlap2, fetchAllConcurrent}, 1, 2},
mixedBench{bench{"3Nodes-Overlap3-UnixfsFetch", 3, 100, overlap2, unixfsFileFetch}, 1, 2}, // mixedBench{bench{"3Nodes-Overlap3-UnixfsFetch", 3, 100, overlap2, unixfsFileFetch}, 1, 2},
} }
func BenchmarkFetchFromOldBitswap(b *testing.B) { func BenchmarkFetchFromOldBitswap(b *testing.B) {
......
...@@ -1007,9 +1007,9 @@ func TestTaggingPeers(t *testing.T) { ...@@ -1007,9 +1007,9 @@ func TestTaggingPeers(t *testing.T) {
} }
func TestTaggingUseful(t *testing.T) { func TestTaggingUseful(t *testing.T) {
peerSampleInterval := 2 * time.Millisecond peerSampleInterval := 5 * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
me := newTestEngine(ctx, "engine", peerSampleInterval) me := newTestEngine(ctx, "engine", peerSampleInterval)
friend := peer.ID("friend") friend := peer.ID("friend")
...@@ -1023,11 +1023,11 @@ func TestTaggingUseful(t *testing.T) { ...@@ -1023,11 +1023,11 @@ func TestTaggingUseful(t *testing.T) {
t.Fatal("Peers should be untagged but weren't") t.Fatal("Peers should be untagged but weren't")
} }
me.Engine.MessageSent(friend, msg) me.Engine.MessageSent(friend, msg)
time.Sleep(peerSampleInterval * 2) time.Sleep(8 * time.Millisecond)
if me.PeerTagger.count(me.Engine.tagUseful) != 1 { if me.PeerTagger.count(me.Engine.tagUseful) != 1 {
t.Fatal("Peers should be tagged but weren't") t.Fatal("Peers should be tagged but weren't")
} }
time.Sleep(peerSampleInterval * 8) time.Sleep(peerSampleInterval * 10)
} }
if me.PeerTagger.count(me.Engine.tagUseful) == 0 { if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
......
...@@ -46,6 +46,7 @@ type MessageNetwork interface { ...@@ -46,6 +46,7 @@ type MessageNetwork interface {
NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
Latency(peer.ID) time.Duration Latency(peer.ID) time.Duration
Ping(context.Context, peer.ID) ping.Result Ping(context.Context, peer.ID) ping.Result
Self() peer.ID
} }
// MessageQueue implements queue of want messages to send to peers. // MessageQueue implements queue of want messages to send to peers.
......
...@@ -4,9 +4,9 @@ import ( ...@@ -4,9 +4,9 @@ import (
"context" "context"
"time" "time"
// lu "github.com/ipfs/go-bitswap/internal/logutil"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
bsgetter "github.com/ipfs/go-bitswap/internal/getter" bsgetter "github.com/ipfs/go-bitswap/internal/getter"
lu "github.com/ipfs/go-bitswap/internal/logutil"
notifications "github.com/ipfs/go-bitswap/internal/notifications" notifications "github.com/ipfs/go-bitswap/internal/notifications"
bspm "github.com/ipfs/go-bitswap/internal/peermanager" bspm "github.com/ipfs/go-bitswap/internal/peermanager"
bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager" bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
...@@ -340,7 +340,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { ...@@ -340,7 +340,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
// Search for providers who have the first want in the list. // Search for providers who have the first want in the list.
// Typically if the provider has the first block they will have // Typically if the provider has the first block they will have
// the rest of the blocks also. // the rest of the blocks also.
log.Warnf("Ses%d: FindMorePeers with want 0 of %d wants", s.id, len(wants)) log.Warnf("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, lu.C(wants[0]), len(wants))
s.findMorePeers(ctx, wants[0]) s.findMorePeers(ctx, wants[0])
} }
s.resetIdleTick() s.resetIdleTick()
......
...@@ -56,7 +56,7 @@ func (sw *sessionWants) GetNextWants(limit int) []cid.Cid { ...@@ -56,7 +56,7 @@ func (sw *sessionWants) GetNextWants(limit int) []cid.Cid {
func (sw *sessionWants) WantsSent(ks []cid.Cid) { func (sw *sessionWants) WantsSent(ks []cid.Cid) {
now := time.Now() now := time.Now()
for _, c := range ks { for _, c := range ks {
if _, ok := sw.liveWants[c]; !ok { if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) {
sw.toFetch.Remove(c) sw.toFetch.Remove(c)
sw.liveWants[c] = now sw.liveWants[c] = now
} }
...@@ -83,8 +83,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) ...@@ -83,8 +83,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
totalLatency += now.Sub(sentAt) totalLatency += now.Sub(sentAt)
} }
// Remove the CID from the live wants / toFetch queue and add it // Remove the CID from the live wants / toFetch queue
// to the past wants
delete(sw.liveWants, c) delete(sw.liveWants, c)
sw.toFetch.Remove(c) sw.toFetch.Remove(c)
} }
...@@ -96,6 +95,9 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) ...@@ -96,6 +95,9 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
// PrepareBroadcast saves the current time for each live want and returns the // PrepareBroadcast saves the current time for each live want and returns the
// live want CIDs. // live want CIDs.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid { func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
// TODO: Change this to return wants in order so that the session will
// send out Find Providers request for the first want
// (Note that maps return keys in random order)
now := time.Now() now := time.Now()
live := make([]cid.Cid, 0, len(sw.liveWants)) live := make([]cid.Cid, 0, len(sw.liveWants))
for c := range sw.liveWants { for c := range sw.liveWants {
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
lu "github.com/ipfs/go-bitswap/internal/logutil"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
...@@ -298,16 +299,41 @@ func (sws *sessionWantSender) trackWant(c cid.Cid) { ...@@ -298,16 +299,41 @@ func (sws *sessionWantSender) trackWant(c cid.Cid) {
// processUpdates processes incoming blocks and HAVE / DONT_HAVEs. // processUpdates processes incoming blocks and HAVE / DONT_HAVEs.
// It returns all DONT_HAVEs. // It returns all DONT_HAVEs.
func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid { func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid {
prunePeers := make(map[peer.ID]struct{}) // Process received blocks keys
dontHaves := cid.NewSet() blkCids := cid.NewSet()
for _, upd := range updates { for _, upd := range updates {
// TODO: If there is a timeout for the want from the peer, remove want.sentTo for _, c := range upd.ks {
// so the want can be sent to another peer (and blacklist the peer?) blkCids.Add(c)
// TODO: If a peer is no longer available, check if all providers of log.Warnf("received block %s", lu.C(c))
// each CID have been exhausted // Remove the want
removed := sws.removeWant(c)
if removed != nil {
// Inform the peer tracker that this peer was the first to send
// us the block
sws.peerRspTrkr.receivedBlockFrom(upd.from)
}
delete(sws.peerConsecutiveDontHaves, upd.from)
}
}
// For each DONT_HAVE // Process received DONT_HAVEs
dontHaves := cid.NewSet()
prunePeers := make(map[peer.ID]struct{})
for _, upd := range updates {
for _, c := range upd.dontHaves { for _, c := range upd.dontHaves {
// Track the number of consecutive DONT_HAVEs each peer receives
if sws.peerConsecutiveDontHaves[upd.from] == peerDontHaveLimit {
prunePeers[upd.from] = struct{}{}
} else {
sws.peerConsecutiveDontHaves[upd.from]++
}
// If we already received a block for the want, there's no need to
// update block presence etc
if blkCids.Has(c) {
continue
}
dontHaves.Add(c) dontHaves.Add(c)
// Update the block presence for the peer // Update the block presence for the peer
...@@ -322,40 +348,41 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid { ...@@ -322,40 +348,41 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid {
sws.setWantSentTo(c, "") sws.setWantSentTo(c, "")
} }
} }
// Track the number of consecutive DONT_HAVEs each peer receives
if sws.peerConsecutiveDontHaves[upd.from] == peerDontHaveLimit {
prunePeers[upd.from] = struct{}{}
} else {
sws.peerConsecutiveDontHaves[upd.from]++
}
} }
}
// For each HAVE // Process received HAVEs
for _, upd := range updates {
for _, c := range upd.haves { for _, c := range upd.haves {
// Update the block presence for the peer // If we haven't already received a block for the want
sws.updateWantBlockPresence(c, upd.from) if !blkCids.Has(c) {
delete(sws.peerConsecutiveDontHaves, upd.from) // Update the block presence for the peer
} sws.updateWantBlockPresence(c, upd.from)
// For each received block
for _, c := range upd.ks {
// Remove the want
removed := sws.removeWant(c)
if removed != nil {
// Inform the peer tracker that this peer was the first to send
// us the block
sws.peerRspTrkr.receivedBlockFrom(upd.from)
} }
// Clear the consecutive DONT_HAVE count for the peer
delete(sws.peerConsecutiveDontHaves, upd.from) delete(sws.peerConsecutiveDontHaves, upd.from)
delete(prunePeers, upd.from)
} }
} }
// If any peers have sent us too many consecutive DONT_HAVEs, remove them // If any peers have sent us too many consecutive DONT_HAVEs, remove them
// from the session // from the session
for p := range prunePeers {
// Before removing the peer from the session, check if the peer
// sent us a HAVE for a block that we want
for c := range sws.wants {
if sws.bpm.PeerHasBlock(p, c) {
delete(prunePeers, p)
break
}
}
}
if len(prunePeers) > 0 { if len(prunePeers) > 0 {
go func() { go func() {
for p := range prunePeers { for p := range prunePeers {
// Peer doesn't have anything we want, so remove it
log.Infof("peer %s sent too many dont haves", lu.P(p))
sws.SignalAvailability(p, false) sws.SignalAvailability(p, false)
} }
}() }()
......
...@@ -529,9 +529,8 @@ func TestConsecutiveDontHaveLimit(t *testing.T) { ...@@ -529,9 +529,8 @@ func TestConsecutiveDontHaveLimit(t *testing.T) {
// Add all cids as wants // Add all cids as wants
spm.Add(cids) spm.Add(cids)
// Receive a HAVE from peer (adds it to the session) // Receive a block from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{}) spm.Update(p, cids[:1], []cid.Cid{}, []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{})
// Wait for processing to complete // Wait for processing to complete
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
...@@ -586,9 +585,8 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) { ...@@ -586,9 +585,8 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
// Add all cids as wants // Add all cids as wants
spm.Add(cids) spm.Add(cids)
// Receive a HAVE from peer (adds it to the session) // Receive a block from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{}) spm.Update(p, cids[:1], []cid.Cid{}, []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{})
// Wait for processing to complete // Wait for processing to complete
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
...@@ -642,9 +640,8 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { ...@@ -642,9 +640,8 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
// Add all cids as wants // Add all cids as wants
spm.Add(cids) spm.Add(cids)
// Receive a HAVE from peer (adds it to the session) // Receive a block from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{}) spm.Update(p, cids[:1], []cid.Cid{}, []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{})
// Wait for processing to complete // Wait for processing to complete
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
...@@ -661,7 +658,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { ...@@ -661,7 +658,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
} }
// Wait for processing to complete // Wait for processing to complete
time.Sleep(5 * time.Millisecond) time.Sleep(10 * time.Millisecond)
// Session should remove peer // Session should remove peer
if has := fpm.HasPeer(p); has { if has := fpm.HasPeer(p); has {
...@@ -673,7 +670,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { ...@@ -673,7 +670,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}) spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{})
// Wait for processing to complete // Wait for processing to complete
time.Sleep(5 * time.Millisecond) time.Sleep(10 * time.Millisecond)
// Peer should be available // Peer should be available
if has := fpm.HasPeer(p); !has { if has := fpm.HasPeer(p); !has {
...@@ -689,7 +686,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { ...@@ -689,7 +686,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
} }
// Wait for processing to complete // Wait for processing to complete
time.Sleep(5 * time.Millisecond) time.Sleep(10 * time.Millisecond)
// Peer should be available // Peer should be available
if has := fpm.HasPeer(p); !has { if has := fpm.HasPeer(p); !has {
...@@ -703,10 +700,54 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { ...@@ -703,10 +700,54 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
} }
// Wait for processing to complete // Wait for processing to complete
time.Sleep(5 * time.Millisecond) time.Sleep(10 * time.Millisecond)
// Session should remove peer // Session should remove peer
if has := fpm.HasPeer(p); has { if has := fpm.HasPeer(p); has {
t.Fatal("Expected peer not to be available") t.Fatal("Expected peer not to be available")
} }
} }
func TestConsecutiveDontHaveDontRemoveIfHasWantedBlock(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
go spm.Run()
// Add all cids as wants
spm.Add(cids)
// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{})
// Wait for processing to complete
time.Sleep(10 * time.Millisecond)
// Peer should be available
if has := fpm.HasPeer(p); !has {
t.Fatal("Expected peer to be available")
}
// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids[1 : peerDontHaveLimit+5] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c})
}
// Wait for processing to complete
time.Sleep(20 * time.Millisecond)
// Peer should still be available because it has a block that we want.
// (We received a HAVE for cid 0 but didn't yet receive the block)
if has := fpm.HasPeer(p); !has {
t.Fatal("Expected peer to be available")
}
}
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"sync" "sync"
lu "github.com/ipfs/go-bitswap/internal/logutil"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
...@@ -61,7 +62,7 @@ func (spm *SessionPeerManager) AddPeer(p peer.ID) bool { ...@@ -61,7 +62,7 @@ func (spm *SessionPeerManager) AddPeer(p peer.ID) bool {
// connection // connection
spm.tagger.TagPeer(p, spm.tag, sessionPeerTagValue) spm.tagger.TagPeer(p, spm.tag, sessionPeerTagValue)
log.Infof("Added peer %s to session: %d peers\n", p, len(spm.peers)) log.Debugf("Added peer %s to session (%d peers)\n", p, len(spm.peers))
return true return true
} }
...@@ -77,6 +78,8 @@ func (spm *SessionPeerManager) RemovePeer(p peer.ID) bool { ...@@ -77,6 +78,8 @@ func (spm *SessionPeerManager) RemovePeer(p peer.ID) bool {
delete(spm.peers, p) delete(spm.peers, p)
spm.tagger.UntagPeer(p, spm.tag) spm.tagger.UntagPeer(p, spm.tag)
log.Debugf("Removed peer %s from session (%d peers)", lu.P(p), len(spm.peers))
return true return true
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment