Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-bitswap
Commits
5030b8dd
Unverified
Commit
5030b8dd
authored
Jan 31, 2020
by
Steven Allen
Committed by
GitHub
Jan 31, 2020
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #247 from ipfs/fix/peer-manager-avail
Fix bug with signaling peer availability to sessions
parents
c0c31ed3
717c564e
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
36 additions
and
7 deletions
+36
-7
internal/peermanager/peermanager.go
internal/peermanager/peermanager.go
+23
-5
internal/peermanager/peermanager_test.go
internal/peermanager/peermanager_test.go
+13
-2
No files found.
internal/peermanager/peermanager.go
View file @
5030b8dd
...
@@ -129,6 +129,10 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
...
@@ -129,6 +129,10 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
pm
.
pwm
.
RemovePeer
(
p
)
pm
.
pwm
.
RemovePeer
(
p
)
}
}
// BroadcastWantHaves broadcasts want-haves to all peers (used by the session
// to discover seeds).
// For each peer it filters out want-haves that have previously been sent to
// the peer.
func
(
pm
*
PeerManager
)
BroadcastWantHaves
(
ctx
context
.
Context
,
wantHaves
[]
cid
.
Cid
)
{
func
(
pm
*
PeerManager
)
BroadcastWantHaves
(
ctx
context
.
Context
,
wantHaves
[]
cid
.
Cid
)
{
pm
.
pqLk
.
Lock
()
pm
.
pqLk
.
Lock
()
defer
pm
.
pqLk
.
Unlock
()
defer
pm
.
pqLk
.
Unlock
()
...
@@ -140,6 +144,8 @@ func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.C
...
@@ -140,6 +144,8 @@ func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.C
}
}
}
}
// SendWants sends the given want-blocks and want-haves to the given peer.
// It filters out wants that have previously been sent to the peer.
func
(
pm
*
PeerManager
)
SendWants
(
ctx
context
.
Context
,
p
peer
.
ID
,
wantBlocks
[]
cid
.
Cid
,
wantHaves
[]
cid
.
Cid
)
{
func
(
pm
*
PeerManager
)
SendWants
(
ctx
context
.
Context
,
p
peer
.
ID
,
wantBlocks
[]
cid
.
Cid
,
wantHaves
[]
cid
.
Cid
)
{
pm
.
pqLk
.
Lock
()
pm
.
pqLk
.
Lock
()
defer
pm
.
pqLk
.
Unlock
()
defer
pm
.
pqLk
.
Unlock
()
...
@@ -150,6 +156,8 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci
...
@@ -150,6 +156,8 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci
}
}
}
}
// SendCancels sends cancels for the given keys to all peers who had previously
// received a want for those keys.
func
(
pm
*
PeerManager
)
SendCancels
(
ctx
context
.
Context
,
cancelKs
[]
cid
.
Cid
)
{
func
(
pm
*
PeerManager
)
SendCancels
(
ctx
context
.
Context
,
cancelKs
[]
cid
.
Cid
)
{
pm
.
pqLk
.
Lock
()
pm
.
pqLk
.
Lock
()
defer
pm
.
pqLk
.
Unlock
()
defer
pm
.
pqLk
.
Unlock
()
...
@@ -162,6 +170,7 @@ func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
...
@@ -162,6 +170,7 @@ func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
}
}
}
}
// CurrentWants returns the list of pending want-blocks
func
(
pm
*
PeerManager
)
CurrentWants
()
[]
cid
.
Cid
{
func
(
pm
*
PeerManager
)
CurrentWants
()
[]
cid
.
Cid
{
pm
.
pqLk
.
RLock
()
pm
.
pqLk
.
RLock
()
defer
pm
.
pqLk
.
RUnlock
()
defer
pm
.
pqLk
.
RUnlock
()
...
@@ -169,6 +178,7 @@ func (pm *PeerManager) CurrentWants() []cid.Cid {
...
@@ -169,6 +178,7 @@ func (pm *PeerManager) CurrentWants() []cid.Cid {
return
pm
.
pwm
.
GetWantBlocks
()
return
pm
.
pwm
.
GetWantBlocks
()
}
}
// CurrentWantHaves returns the list of pending want-haves
func
(
pm
*
PeerManager
)
CurrentWantHaves
()
[]
cid
.
Cid
{
func
(
pm
*
PeerManager
)
CurrentWantHaves
()
[]
cid
.
Cid
{
pm
.
pqLk
.
RLock
()
pm
.
pqLk
.
RLock
()
defer
pm
.
pqLk
.
RUnlock
()
defer
pm
.
pqLk
.
RUnlock
()
...
@@ -187,6 +197,8 @@ func (pm *PeerManager) getOrCreate(p peer.ID) *peerQueueInstance {
...
@@ -187,6 +197,8 @@ func (pm *PeerManager) getOrCreate(p peer.ID) *peerQueueInstance {
return
pqi
return
pqi
}
}
// RegisterSession tells the PeerManager that the given session is interested
// in events about the given peer.
func
(
pm
*
PeerManager
)
RegisterSession
(
p
peer
.
ID
,
s
Session
)
bool
{
func
(
pm
*
PeerManager
)
RegisterSession
(
p
peer
.
ID
,
s
Session
)
bool
{
pm
.
psLk
.
Lock
()
pm
.
psLk
.
Lock
()
defer
pm
.
psLk
.
Unlock
()
defer
pm
.
psLk
.
Unlock
()
...
@@ -204,6 +216,8 @@ func (pm *PeerManager) RegisterSession(p peer.ID, s Session) bool {
...
@@ -204,6 +216,8 @@ func (pm *PeerManager) RegisterSession(p peer.ID, s Session) bool {
return
ok
return
ok
}
}
// UnregisterSession tells the PeerManager that the given session is no longer
// interested in PeerManager events.
func
(
pm
*
PeerManager
)
UnregisterSession
(
ses
uint64
)
{
func
(
pm
*
PeerManager
)
UnregisterSession
(
ses
uint64
)
{
pm
.
psLk
.
Lock
()
pm
.
psLk
.
Lock
()
defer
pm
.
psLk
.
Unlock
()
defer
pm
.
psLk
.
Unlock
()
...
@@ -218,12 +232,16 @@ func (pm *PeerManager) UnregisterSession(ses uint64) {
...
@@ -218,12 +232,16 @@ func (pm *PeerManager) UnregisterSession(ses uint64) {
delete
(
pm
.
sessions
,
ses
)
delete
(
pm
.
sessions
,
ses
)
}
}
// signalAvailability is called when a peer's connectivity changes.
// It informs interested sessions.
func
(
pm
*
PeerManager
)
signalAvailability
(
p
peer
.
ID
,
isConnected
bool
)
{
func
(
pm
*
PeerManager
)
signalAvailability
(
p
peer
.
ID
,
isConnected
bool
)
{
for
p
,
sesIds
:=
range
pm
.
peerSessions
{
sesIds
,
ok
:=
pm
.
peerSessions
[
p
]
for
sesId
:=
range
sesIds
{
if
!
ok
{
if
s
,
ok
:=
pm
.
sessions
[
sesId
];
ok
{
return
s
.
SignalAvailability
(
p
,
isConnected
)
}
}
for
sesId
:=
range
sesIds
{
if
s
,
ok
:=
pm
.
sessions
[
sesId
];
ok
{
s
.
SignalAvailability
(
p
,
isConnected
)
}
}
}
}
}
}
internal/peermanager/peermanager_test.go
View file @
5030b8dd
...
@@ -272,8 +272,8 @@ func TestSessionRegistration(t *testing.T) {
...
@@ -272,8 +272,8 @@ func TestSessionRegistration(t *testing.T) {
msgs
:=
make
(
chan
msg
,
16
)
msgs
:=
make
(
chan
msg
,
16
)
peerQueueFactory
:=
makePeerQueueFactory
(
msgs
)
peerQueueFactory
:=
makePeerQueueFactory
(
msgs
)
tp
:=
testutil
.
GeneratePeers
(
2
)
tp
:=
testutil
.
GeneratePeers
(
3
)
self
,
p1
:=
tp
[
0
],
tp
[
1
]
self
,
p1
,
p2
:=
tp
[
0
],
tp
[
1
]
,
tp
[
2
]
peerManager
:=
New
(
ctx
,
peerQueueFactory
,
self
)
peerManager
:=
New
(
ctx
,
peerQueueFactory
,
self
)
id
:=
uint64
(
1
)
id
:=
uint64
(
1
)
...
@@ -282,16 +282,27 @@ func TestSessionRegistration(t *testing.T) {
...
@@ -282,16 +282,27 @@ func TestSessionRegistration(t *testing.T) {
if
s
.
available
[
p1
]
{
if
s
.
available
[
p1
]
{
t
.
Fatal
(
"Expected peer not be available till connected"
)
t
.
Fatal
(
"Expected peer not be available till connected"
)
}
}
peerManager
.
RegisterSession
(
p2
,
s
)
if
s
.
available
[
p2
]
{
t
.
Fatal
(
"Expected peer not be available till connected"
)
}
peerManager
.
Connected
(
p1
,
nil
)
peerManager
.
Connected
(
p1
,
nil
)
if
!
s
.
available
[
p1
]
{
if
!
s
.
available
[
p1
]
{
t
.
Fatal
(
"Expected signal callback"
)
t
.
Fatal
(
"Expected signal callback"
)
}
}
peerManager
.
Connected
(
p2
,
nil
)
if
!
s
.
available
[
p2
]
{
t
.
Fatal
(
"Expected signal callback"
)
}
peerManager
.
Disconnected
(
p1
)
peerManager
.
Disconnected
(
p1
)
if
s
.
available
[
p1
]
{
if
s
.
available
[
p1
]
{
t
.
Fatal
(
"Expected signal callback"
)
t
.
Fatal
(
"Expected signal callback"
)
}
}
if
!
s
.
available
[
p2
]
{
t
.
Fatal
(
"Expected signal callback only for disconnected peer"
)
}
peerManager
.
UnregisterSession
(
id
)
peerManager
.
UnregisterSession
(
id
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment