Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-bitswap
Commits
1247b02d
Commit
1247b02d
authored
Mar 05, 2020
by
Dirk McCormick
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix: overly aggressive session peer removal
parent
b647e029
Changes
6
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
114 additions
and
35 deletions
+114
-35
internal/messagequeue/messagequeue.go
internal/messagequeue/messagequeue.go
+1
-0
internal/session/session.go
internal/session/session.go
+2
-2
internal/session/sessionwants.go
internal/session/sessionwants.go
+5
-3
internal/session/sessionwantsender.go
internal/session/sessionwantsender.go
+52
-20
internal/session/sessionwantsender_test.go
internal/session/sessionwantsender_test.go
+50
-9
internal/sessionpeermanager/sessionpeermanager.go
internal/sessionpeermanager/sessionpeermanager.go
+4
-1
No files found.
internal/messagequeue/messagequeue.go
View file @
1247b02d
...
@@ -46,6 +46,7 @@ type MessageNetwork interface {
...
@@ -46,6 +46,7 @@ type MessageNetwork interface {
NewMessageSender
(
context
.
Context
,
peer
.
ID
)
(
bsnet
.
MessageSender
,
error
)
NewMessageSender
(
context
.
Context
,
peer
.
ID
)
(
bsnet
.
MessageSender
,
error
)
Latency
(
peer
.
ID
)
time
.
Duration
Latency
(
peer
.
ID
)
time
.
Duration
Ping
(
context
.
Context
,
peer
.
ID
)
ping
.
Result
Ping
(
context
.
Context
,
peer
.
ID
)
ping
.
Result
Self
()
peer
.
ID
}
}
// MessageQueue implements queue of want messages to send to peers.
// MessageQueue implements queue of want messages to send to peers.
...
...
internal/session/session.go
View file @
1247b02d
...
@@ -4,9 +4,9 @@ import (
...
@@ -4,9 +4,9 @@ import (
"context"
"context"
"time"
"time"
// lu "github.com/ipfs/go-bitswap/internal/logutil"
bsbpm
"github.com/ipfs/go-bitswap/internal/blockpresencemanager"
bsbpm
"github.com/ipfs/go-bitswap/internal/blockpresencemanager"
bsgetter
"github.com/ipfs/go-bitswap/internal/getter"
bsgetter
"github.com/ipfs/go-bitswap/internal/getter"
lu
"github.com/ipfs/go-bitswap/internal/logutil"
notifications
"github.com/ipfs/go-bitswap/internal/notifications"
notifications
"github.com/ipfs/go-bitswap/internal/notifications"
bspm
"github.com/ipfs/go-bitswap/internal/peermanager"
bspm
"github.com/ipfs/go-bitswap/internal/peermanager"
bssim
"github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
bssim
"github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
...
@@ -340,7 +340,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
...
@@ -340,7 +340,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
// Search for providers who have the first want in the list.
// Search for providers who have the first want in the list.
// Typically if the provider has the first block they will have
// Typically if the provider has the first block they will have
// the rest of the blocks also.
// the rest of the blocks also.
log
.
Warnf
(
"Ses%d: FindMorePeers with want
0
of %d wants"
,
s
.
id
,
len
(
wants
))
log
.
Warnf
(
"Ses%d: FindMorePeers with want
%s (1st
of %d wants
)
"
,
s
.
id
,
lu
.
C
(
wants
[
0
]),
len
(
wants
))
s
.
findMorePeers
(
ctx
,
wants
[
0
])
s
.
findMorePeers
(
ctx
,
wants
[
0
])
}
}
s
.
resetIdleTick
()
s
.
resetIdleTick
()
...
...
internal/session/sessionwants.go
View file @
1247b02d
...
@@ -56,7 +56,7 @@ func (sw *sessionWants) GetNextWants(limit int) []cid.Cid {
...
@@ -56,7 +56,7 @@ func (sw *sessionWants) GetNextWants(limit int) []cid.Cid {
func
(
sw
*
sessionWants
)
WantsSent
(
ks
[]
cid
.
Cid
)
{
func
(
sw
*
sessionWants
)
WantsSent
(
ks
[]
cid
.
Cid
)
{
now
:=
time
.
Now
()
now
:=
time
.
Now
()
for
_
,
c
:=
range
ks
{
for
_
,
c
:=
range
ks
{
if
_
,
ok
:=
sw
.
liveWants
[
c
];
!
ok
{
if
_
,
ok
:=
sw
.
liveWants
[
c
];
!
ok
&&
sw
.
toFetch
.
Has
(
c
)
{
sw
.
toFetch
.
Remove
(
c
)
sw
.
toFetch
.
Remove
(
c
)
sw
.
liveWants
[
c
]
=
now
sw
.
liveWants
[
c
]
=
now
}
}
...
@@ -83,8 +83,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
...
@@ -83,8 +83,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
totalLatency
+=
now
.
Sub
(
sentAt
)
totalLatency
+=
now
.
Sub
(
sentAt
)
}
}
// Remove the CID from the live wants / toFetch queue and add it
// Remove the CID from the live wants / toFetch queue
// to the past wants
delete
(
sw
.
liveWants
,
c
)
delete
(
sw
.
liveWants
,
c
)
sw
.
toFetch
.
Remove
(
c
)
sw
.
toFetch
.
Remove
(
c
)
}
}
...
@@ -96,6 +95,9 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
...
@@ -96,6 +95,9 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
// PrepareBroadcast saves the current time for each live want and returns the
// PrepareBroadcast saves the current time for each live want and returns the
// live want CIDs.
// live want CIDs.
func
(
sw
*
sessionWants
)
PrepareBroadcast
()
[]
cid
.
Cid
{
func
(
sw
*
sessionWants
)
PrepareBroadcast
()
[]
cid
.
Cid
{
// TODO: Change this to return wants in order so that the session will
// send out Find Providers request for the first want
// (Note that maps return keys in random order)
now
:=
time
.
Now
()
now
:=
time
.
Now
()
live
:=
make
([]
cid
.
Cid
,
0
,
len
(
sw
.
liveWants
))
live
:=
make
([]
cid
.
Cid
,
0
,
len
(
sw
.
liveWants
))
for
c
:=
range
sw
.
liveWants
{
for
c
:=
range
sw
.
liveWants
{
...
...
internal/session/sessionwantsender.go
View file @
1247b02d
...
@@ -4,6 +4,7 @@ import (
...
@@ -4,6 +4,7 @@ import (
"context"
"context"
bsbpm
"github.com/ipfs/go-bitswap/internal/blockpresencemanager"
bsbpm
"github.com/ipfs/go-bitswap/internal/blockpresencemanager"
lu
"github.com/ipfs/go-bitswap/internal/logutil"
cid
"github.com/ipfs/go-cid"
cid
"github.com/ipfs/go-cid"
peer
"github.com/libp2p/go-libp2p-core/peer"
peer
"github.com/libp2p/go-libp2p-core/peer"
...
@@ -298,16 +299,34 @@ func (sws *sessionWantSender) trackWant(c cid.Cid) {
...
@@ -298,16 +299,34 @@ func (sws *sessionWantSender) trackWant(c cid.Cid) {
// processUpdates processes incoming blocks and HAVE / DONT_HAVEs.
// processUpdates processes incoming blocks and HAVE / DONT_HAVEs.
// It returns all DONT_HAVEs.
// It returns all DONT_HAVEs.
func
(
sws
*
sessionWantSender
)
processUpdates
(
updates
[]
update
)
[]
cid
.
Cid
{
func
(
sws
*
sessionWantSender
)
processUpdates
(
updates
[]
update
)
[]
cid
.
Cid
{
prunePeers
:=
make
(
map
[
peer
.
ID
]
struct
{})
// Process received blocks keys
dontHave
s
:=
cid
.
NewSet
()
blkCid
s
:=
cid
.
NewSet
()
for
_
,
upd
:=
range
updates
{
for
_
,
upd
:=
range
updates
{
// TODO: If there is a timeout for the want from the peer, remove want.sentTo
for
_
,
c
:=
range
upd
.
ks
{
// so the want can be sent to another peer (and blacklist the peer?)
blkCids
.
Add
(
c
)
// TODO: If a peer is no longer available, check if all providers of
log
.
Warnf
(
"received block %s"
,
lu
.
C
(
c
))
// each CID have been exhausted
// Remove the want
removed
:=
sws
.
removeWant
(
c
)
if
removed
!=
nil
{
// Inform the peer tracker that this peer was the first to send
// us the block
sws
.
peerRspTrkr
.
receivedBlockFrom
(
upd
.
from
)
}
delete
(
sws
.
peerConsecutiveDontHaves
,
upd
.
from
)
}
}
// For each DONT_HAVE
// Process received DONT_HAVEs
dontHaves
:=
cid
.
NewSet
()
prunePeers
:=
make
(
map
[
peer
.
ID
]
struct
{})
for
_
,
upd
:=
range
updates
{
for
_
,
c
:=
range
upd
.
dontHaves
{
for
_
,
c
:=
range
upd
.
dontHaves
{
// If we already received a block for the want, ignore any
// DONT_HAVE for the want
if
blkCids
.
Has
(
c
)
{
continue
}
dontHaves
.
Add
(
c
)
dontHaves
.
Add
(
c
)
// Update the block presence for the peer
// Update the block presence for the peer
...
@@ -330,24 +349,23 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid {
...
@@ -330,24 +349,23 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid {
sws
.
peerConsecutiveDontHaves
[
upd
.
from
]
++
sws
.
peerConsecutiveDontHaves
[
upd
.
from
]
++
}
}
}
}
}
// For each HAVE
// Process received HAVEs
for
_
,
upd
:=
range
updates
{
for
_
,
c
:=
range
upd
.
haves
{
for
_
,
c
:=
range
upd
.
haves
{
// If we already received a block for the want, ignore any HAVE for
// the want
if
blkCids
.
Has
(
c
)
{
continue
}
// Update the block presence for the peer
// Update the block presence for the peer
sws
.
updateWantBlockPresence
(
c
,
upd
.
from
)
sws
.
updateWantBlockPresence
(
c
,
upd
.
from
)
delete
(
sws
.
peerConsecutiveDontHaves
,
upd
.
from
)
}
// For each received block
// Clear the consecutive DONT_HAVE count for the peer
for
_
,
c
:=
range
upd
.
ks
{
// Remove the want
removed
:=
sws
.
removeWant
(
c
)
if
removed
!=
nil
{
// Inform the peer tracker that this peer was the first to send
// us the block
sws
.
peerRspTrkr
.
receivedBlockFrom
(
upd
.
from
)
}
delete
(
sws
.
peerConsecutiveDontHaves
,
upd
.
from
)
delete
(
sws
.
peerConsecutiveDontHaves
,
upd
.
from
)
delete
(
prunePeers
,
upd
.
from
)
}
}
}
}
...
@@ -356,8 +374,22 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid {
...
@@ -356,8 +374,22 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid {
if
len
(
prunePeers
)
>
0
{
if
len
(
prunePeers
)
>
0
{
go
func
()
{
go
func
()
{
for
p
:=
range
prunePeers
{
for
p
:=
range
prunePeers
{
// Before removing the peer from the session, check if the peer
// sent us a HAVE for a block that we want
peerHasWantedBlock
:=
false
for
c
:=
range
sws
.
wants
{
if
sws
.
bpm
.
PeerHasBlock
(
p
,
c
)
{
peerHasWantedBlock
=
true
break
}
}
// Peer doesn't have anything we want, so remove it
if
!
peerHasWantedBlock
{
log
.
Infof
(
"peer %s sent too many dont haves"
,
lu
.
P
(
p
))
sws
.
SignalAvailability
(
p
,
false
)
sws
.
SignalAvailability
(
p
,
false
)
}
}
}
}()
}()
}
}
...
...
internal/session/sessionwantsender_test.go
View file @
1247b02d
...
@@ -476,9 +476,8 @@ func TestConsecutiveDontHaveLimit(t *testing.T) {
...
@@ -476,9 +476,8 @@ func TestConsecutiveDontHaveLimit(t *testing.T) {
// Add all cids as wants
// Add all cids as wants
spm
.
Add
(
cids
)
spm
.
Add
(
cids
)
// Receive a HAVE from peer (adds it to the session)
// Receive a block from peer (adds it to the session)
bpm
.
ReceiveFrom
(
p
,
cids
[
:
1
],
[]
cid
.
Cid
{})
spm
.
Update
(
p
,
cids
[
:
1
],
[]
cid
.
Cid
{},
[]
cid
.
Cid
{})
spm
.
Update
(
p
,
[]
cid
.
Cid
{},
cids
[
:
1
],
[]
cid
.
Cid
{})
// Wait for processing to complete
// Wait for processing to complete
time
.
Sleep
(
10
*
time
.
Millisecond
)
time
.
Sleep
(
10
*
time
.
Millisecond
)
...
@@ -533,9 +532,8 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
...
@@ -533,9 +532,8 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
// Add all cids as wants
// Add all cids as wants
spm
.
Add
(
cids
)
spm
.
Add
(
cids
)
// Receive a HAVE from peer (adds it to the session)
// Receive a block from peer (adds it to the session)
bpm
.
ReceiveFrom
(
p
,
cids
[
:
1
],
[]
cid
.
Cid
{})
spm
.
Update
(
p
,
cids
[
:
1
],
[]
cid
.
Cid
{},
[]
cid
.
Cid
{})
spm
.
Update
(
p
,
[]
cid
.
Cid
{},
cids
[
:
1
],
[]
cid
.
Cid
{})
// Wait for processing to complete
// Wait for processing to complete
time
.
Sleep
(
5
*
time
.
Millisecond
)
time
.
Sleep
(
5
*
time
.
Millisecond
)
...
@@ -589,9 +587,8 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
...
@@ -589,9 +587,8 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
// Add all cids as wants
// Add all cids as wants
spm
.
Add
(
cids
)
spm
.
Add
(
cids
)
// Receive a HAVE from peer (adds it to the session)
// Receive a block from peer (adds it to the session)
bpm
.
ReceiveFrom
(
p
,
cids
[
:
1
],
[]
cid
.
Cid
{})
spm
.
Update
(
p
,
cids
[
:
1
],
[]
cid
.
Cid
{},
[]
cid
.
Cid
{})
spm
.
Update
(
p
,
[]
cid
.
Cid
{},
cids
[
:
1
],
[]
cid
.
Cid
{})
// Wait for processing to complete
// Wait for processing to complete
time
.
Sleep
(
5
*
time
.
Millisecond
)
time
.
Sleep
(
5
*
time
.
Millisecond
)
...
@@ -657,3 +654,47 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
...
@@ -657,3 +654,47 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
t
.
Fatal
(
"Expected peer not to be available"
)
t
.
Fatal
(
"Expected peer not to be available"
)
}
}
}
}
func
TestConsecutiveDontHaveDontRemoveIfHasWantedBlock
(
t
*
testing
.
T
)
{
cids
:=
testutil
.
GenerateCids
(
peerDontHaveLimit
+
10
)
p
:=
testutil
.
GeneratePeers
(
1
)[
0
]
sid
:=
uint64
(
1
)
pm
:=
newMockPeerManager
()
fpm
:=
newFakeSessionPeerManager
()
bpm
:=
bsbpm
.
New
()
onSend
:=
func
(
peer
.
ID
,
[]
cid
.
Cid
,
[]
cid
.
Cid
)
{}
onPeersExhausted
:=
func
([]
cid
.
Cid
)
{}
spm
:=
newSessionWantSender
(
context
.
Background
(),
sid
,
pm
,
fpm
,
bpm
,
onSend
,
onPeersExhausted
)
go
spm
.
Run
()
// Add all cids as wants
spm
.
Add
(
cids
)
// Receive a HAVE from peer (adds it to the session)
bpm
.
ReceiveFrom
(
p
,
cids
[
:
1
],
[]
cid
.
Cid
{})
spm
.
Update
(
p
,
[]
cid
.
Cid
{},
cids
[
:
1
],
[]
cid
.
Cid
{})
// Wait for processing to complete
time
.
Sleep
(
10
*
time
.
Millisecond
)
// Peer should be available
if
has
:=
fpm
.
HasPeer
(
p
);
!
has
{
t
.
Fatal
(
"Expected peer to be available"
)
}
// Receive DONT_HAVEs from peer that exceed limit
for
_
,
c
:=
range
cids
[
1
:
peerDontHaveLimit
+
5
]
{
bpm
.
ReceiveFrom
(
p
,
[]
cid
.
Cid
{},
[]
cid
.
Cid
{
c
})
spm
.
Update
(
p
,
[]
cid
.
Cid
{},
[]
cid
.
Cid
{},
[]
cid
.
Cid
{
c
})
}
// Wait for processing to complete
time
.
Sleep
(
20
*
time
.
Millisecond
)
// Peer should still be available because it has a block that we want.
// (We received a HAVE for cid 0 but didn't yet receive the block)
if
has
:=
fpm
.
HasPeer
(
p
);
!
has
{
t
.
Fatal
(
"Expected peer to be available"
)
}
}
internal/sessionpeermanager/sessionpeermanager.go
View file @
1247b02d
...
@@ -4,6 +4,7 @@ import (
...
@@ -4,6 +4,7 @@ import (
"fmt"
"fmt"
"sync"
"sync"
lu
"github.com/ipfs/go-bitswap/internal/logutil"
logging
"github.com/ipfs/go-log"
logging
"github.com/ipfs/go-log"
peer
"github.com/libp2p/go-libp2p-core/peer"
peer
"github.com/libp2p/go-libp2p-core/peer"
...
@@ -61,7 +62,7 @@ func (spm *SessionPeerManager) AddPeer(p peer.ID) bool {
...
@@ -61,7 +62,7 @@ func (spm *SessionPeerManager) AddPeer(p peer.ID) bool {
// connection
// connection
spm
.
tagger
.
TagPeer
(
p
,
spm
.
tag
,
sessionPeerTagValue
)
spm
.
tagger
.
TagPeer
(
p
,
spm
.
tag
,
sessionPeerTagValue
)
log
.
Info
f
(
"Added peer %s to session
:
%d peers
\n
"
,
p
,
len
(
spm
.
peers
))
log
.
Debug
f
(
"Added peer %s to session
(
%d peers
)
\n
"
,
p
,
len
(
spm
.
peers
))
return
true
return
true
}
}
...
@@ -77,6 +78,8 @@ func (spm *SessionPeerManager) RemovePeer(p peer.ID) bool {
...
@@ -77,6 +78,8 @@ func (spm *SessionPeerManager) RemovePeer(p peer.ID) bool {
delete
(
spm
.
peers
,
p
)
delete
(
spm
.
peers
,
p
)
spm
.
tagger
.
UntagPeer
(
p
,
spm
.
tag
)
spm
.
tagger
.
UntagPeer
(
p
,
spm
.
tag
)
log
.
Debugf
(
"Removed peer %s from session (%d peers)"
,
lu
.
P
(
p
),
len
(
spm
.
peers
))
return
true
return
true
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment