Unverified Commit 9ddd13e7 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #261 from ipfs/fix/prune-session-peers

Prune peers that send too many consecutive DONT_HAVEs
parents f4c97028 4d2bdc27
...@@ -9,8 +9,13 @@ import ( ...@@ -9,8 +9,13 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
) )
// Maximum number of changes to accept before blocking const (
const changesBufferSize = 128 // Maximum number of changes to accept before blocking
changesBufferSize = 128
// If the session receives this many DONT_HAVEs in a row from a peer,
// it prunes the peer from the session
peerDontHaveLimit = 16
)
// BlockPresence indicates whether a peer has a block. // BlockPresence indicates whether a peer has a block.
// Note that the order is important, we decide which peer to send a want to // Note that the order is important, we decide which peer to send a want to
...@@ -76,13 +81,14 @@ type sessionWantSender struct { ...@@ -76,13 +81,14 @@ type sessionWantSender struct {
changes chan change changes chan change
// Information about each want indexed by CID // Information about each want indexed by CID
wants map[cid.Cid]*wantInfo wants map[cid.Cid]*wantInfo
// Keeps track of how many consecutive DONT_HAVEs a peer has sent
peerConsecutiveDontHaves map[peer.ID]int
// Tracks which peers we have send want-block to // Tracks which peers we have send want-block to
swbt *sentWantBlocksTracker swbt *sentWantBlocksTracker
// Maintains a list of peers and whether they are connected // Maintains a list of peers and whether they are connected
peerAvlMgr *peerAvailabilityManager peerAvlMgr *peerAvailabilityManager
// Tracks the number of blocks each peer sent us // Tracks the number of blocks each peer sent us
peerRspTrkr *peerResponseTracker peerRspTrkr *peerResponseTracker
// Sends wants to peers // Sends wants to peers
pm PeerManager pm PeerManager
// Keeps track of which peer has / doesn't have a block // Keeps track of which peer has / doesn't have a block
...@@ -101,6 +107,7 @@ func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, bpm * ...@@ -101,6 +107,7 @@ func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, bpm *
sessionID: sid, sessionID: sid,
changes: make(chan change, changesBufferSize), changes: make(chan change, changesBufferSize),
wants: make(map[cid.Cid]*wantInfo), wants: make(map[cid.Cid]*wantInfo),
peerConsecutiveDontHaves: make(map[peer.ID]int),
swbt: newSentWantBlocksTracker(), swbt: newSentWantBlocksTracker(),
peerAvlMgr: newPeerAvailabilityManager(), peerAvlMgr: newPeerAvailabilityManager(),
peerRspTrkr: newPeerResponseTracker(), peerRspTrkr: newPeerResponseTracker(),
...@@ -258,6 +265,9 @@ func (spm *sessionWantSender) processAvailability(availability map[peer.ID]bool) ...@@ -258,6 +265,9 @@ func (spm *sessionWantSender) processAvailability(availability map[peer.ID]bool)
if isNowAvailable { if isNowAvailable {
newlyAvailable = append(newlyAvailable, p) newlyAvailable = append(newlyAvailable, p)
} }
// Reset the count of consecutive DONT_HAVEs received from the
// peer
delete(spm.peerConsecutiveDontHaves, p)
} }
} }
} }
...@@ -265,6 +275,12 @@ func (spm *sessionWantSender) processAvailability(availability map[peer.ID]bool) ...@@ -265,6 +275,12 @@ func (spm *sessionWantSender) processAvailability(availability map[peer.ID]bool)
return newlyAvailable return newlyAvailable
} }
// isAvailable indicates whether the peer is available and whether
// it's been tracked by the Session (used by the tests)
func (spm *sessionWantSender) isAvailable(p peer.ID) (bool, bool) {
return spm.peerAvlMgr.isAvailable(p)
}
// trackWant creates a new entry in the map of CID -> want info // trackWant creates a new entry in the map of CID -> want info
func (spm *sessionWantSender) trackWant(c cid.Cid) { func (spm *sessionWantSender) trackWant(c cid.Cid) {
// fmt.Printf("trackWant %s\n", lu.C(c)) // fmt.Printf("trackWant %s\n", lu.C(c))
...@@ -285,6 +301,7 @@ func (spm *sessionWantSender) trackWant(c cid.Cid) { ...@@ -285,6 +301,7 @@ func (spm *sessionWantSender) trackWant(c cid.Cid) {
// processUpdates processes incoming blocks and HAVE / DONT_HAVEs // processUpdates processes incoming blocks and HAVE / DONT_HAVEs
func (spm *sessionWantSender) processUpdates(updates []update) { func (spm *sessionWantSender) processUpdates(updates []update) {
prunePeers := make(map[peer.ID]struct{})
dontHaves := cid.NewSet() dontHaves := cid.NewSet()
for _, upd := range updates { for _, upd := range updates {
// TODO: If there is a timeout for the want from the peer, remove want.sentTo // TODO: If there is a timeout for the want from the peer, remove want.sentTo
...@@ -308,12 +325,20 @@ func (spm *sessionWantSender) processUpdates(updates []update) { ...@@ -308,12 +325,20 @@ func (spm *sessionWantSender) processUpdates(updates []update) {
spm.setWantSentTo(c, "") spm.setWantSentTo(c, "")
} }
} }
// Track the number of consecutive DONT_HAVEs each peer receives
if spm.peerConsecutiveDontHaves[upd.from] == peerDontHaveLimit {
prunePeers[upd.from] = struct{}{}
} else {
spm.peerConsecutiveDontHaves[upd.from]++
}
} }
// For each HAVE // For each HAVE
for _, c := range upd.haves { for _, c := range upd.haves {
// Update the block presence for the peer // Update the block presence for the peer
spm.updateWantBlockPresence(c, upd.from) spm.updateWantBlockPresence(c, upd.from)
delete(spm.peerConsecutiveDontHaves, upd.from)
} }
// For each received block // For each received block
...@@ -325,6 +350,7 @@ func (spm *sessionWantSender) processUpdates(updates []update) { ...@@ -325,6 +350,7 @@ func (spm *sessionWantSender) processUpdates(updates []update) {
// us the block // us the block
spm.peerRspTrkr.receivedBlockFrom(upd.from) spm.peerRspTrkr.receivedBlockFrom(upd.from)
} }
delete(spm.peerConsecutiveDontHaves, upd.from)
} }
} }
...@@ -337,6 +363,12 @@ func (spm *sessionWantSender) processUpdates(updates []update) { ...@@ -337,6 +363,12 @@ func (spm *sessionWantSender) processUpdates(updates []update) {
spm.onPeersExhausted(newlyExhausted) spm.onPeersExhausted(newlyExhausted)
} }
} }
// If any peers have sent us too many consecutive DONT_HAVEs, remove them
// from the session
for p := range prunePeers {
spm.SignalAvailability(p, false)
}
} }
// convenience structs for passing around want-blocks and want-haves for a peer // convenience structs for passing around want-blocks and want-haves for a peer
......
...@@ -346,3 +346,198 @@ func TestPeersExhausted(t *testing.T) { ...@@ -346,3 +346,198 @@ func TestPeersExhausted(t *testing.T) {
t.Fatal("Wrong keys") t.Fatal("Wrong keys")
} }
} }
func TestConsecutiveDontHaveLimit(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
go spm.Run()
// Add all cids as wants
spm.Add(cids)
// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}
// Receive DONT_HAVEs from peer that do not exceed limit
for _, c := range cids[1:peerDontHaveLimit] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}
// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids[peerDontHaveLimit:] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Session should remove peer
if avail, _ := spm.isAvailable(p); avail {
t.Fatal("Expected peer not to be available")
}
}
func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
go spm.Run()
// Add all cids as wants
spm.Add(cids)
// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}
// Receive DONT_HAVE then HAVE then DONT_HAVE from peer,
// where consecutive DONT_HAVEs would have exceeded limit
// (but they are not consecutive)
for _, c := range cids[1:peerDontHaveLimit] {
// DONT_HAVEs
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}
for _, c := range cids[peerDontHaveLimit : peerDontHaveLimit+1] {
// HAVEs
bpm.ReceiveFrom(p, []cid.Cid{c}, []cid.Cid{})
spm.Update(p, []cid.Cid{}, []cid.Cid{c}, []cid.Cid{}, false)
}
for _, c := range cids[peerDontHaveLimit+1:] {
// DONT_HAVEs
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}
}
func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
go spm.Run()
// Add all cids as wants
spm.Add(cids)
// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}
// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids[1 : peerDontHaveLimit+2] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Session should remove peer
if avail, _ := spm.isAvailable(p); avail {
t.Fatal("Expected peer not to be available")
}
// Receive a HAVE from peer (adds it back into the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}
cids2 := testutil.GenerateCids(peerDontHaveLimit + 10)
// Receive DONT_HAVEs from peer that don't exceed limit
for _, c := range cids2[1:peerDontHaveLimit] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}
// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids2[peerDontHaveLimit:] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Session should remove peer
if avail, _ := spm.isAvailable(p); avail {
t.Fatal("Expected peer not to be available")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment