Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-bitswap
Commits
c3c0ad1b
Commit
c3c0ad1b
authored
Mar 24, 2020
by
Dirk McCormick
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix: wait for sessionWantSender to shutdown before completing session shutdown
parent
9b89a3b1
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
24 additions
and
9 deletions
+24
-9
internal/session/session.go
internal/session/session.go
+3
-0
internal/session/sessionwantsender.go
internal/session/sessionwantsender.go
+21
-9
No files found.
internal/session/session.go
View file @
c3c0ad1b
...
...
@@ -387,6 +387,9 @@ func (s *Session) handleShutdown() {
s
.
idleTick
.
Stop
()
// Shut down the session peer manager
s
.
sprm
.
Shutdown
()
// Shut down the sessionWantSender (blocks until sessionWantSender stops
// sending)
s
.
sws
.
Shutdown
()
// Remove the session from the want manager
s
.
wm
.
RemoveSession
(
s
.
ctx
,
s
.
id
)
}
...
...
internal/session/sessionwantsender.go
View file @
c3c0ad1b
...
...
@@ -71,8 +71,11 @@ type onPeersExhaustedFn func([]cid.Cid)
// consults the peer response tracker (records which peers sent us blocks).
//
type
sessionWantSender
struct
{
//
When t
he context is
cancelled, sessionWantSender shuts down
//
T
he context is
used when sending wants
ctx
context
.
Context
// The sessionWantSender uses these channels when it's shutting down
closing
chan
struct
{}
closed
chan
struct
{}
// The session ID
sessionID
uint64
// A channel that collects incoming changes (events)
...
...
@@ -102,6 +105,8 @@ func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, spm S
sws
:=
sessionWantSender
{
ctx
:
ctx
,
closing
:
make
(
chan
struct
{}),
closed
:
make
(
chan
struct
{}),
sessionID
:
sid
,
changes
:
make
(
chan
change
,
changesBufferSize
),
wants
:
make
(
map
[
cid
.
Cid
]
*
wantInfo
),
...
...
@@ -157,26 +162,33 @@ func (sws *sessionWantSender) Run() {
select
{
case
ch
:=
<-
sws
.
changes
:
sws
.
onChange
([]
change
{
ch
})
case
<-
sws
.
ctx
.
Done
()
:
sws
.
shutdown
()
case
<-
sws
.
closing
:
// Close the 'closed' channel to signal to Shutdown() that the run
// loop has exited
close
(
sws
.
closed
)
return
}
}
}
// Shutdown the sessionWantSender
func
(
sws
*
sessionWantSender
)
Shutdown
()
{
// Signal to the run loop to stop processing
close
(
sws
.
closing
)
// Unregister the session with the PeerManager
sws
.
pm
.
UnregisterSession
(
sws
.
sessionID
)
// Wait for run loop to complete
<-
sws
.
closed
}
// addChange adds a new change to the queue
func
(
sws
*
sessionWantSender
)
addChange
(
c
change
)
{
select
{
case
sws
.
changes
<-
c
:
case
<-
sws
.
c
tx
.
Done
()
:
case
<-
sws
.
c
losing
:
}
}
// shutdown unregisters the session with the PeerManager
func
(
sws
*
sessionWantSender
)
shutdown
()
{
sws
.
pm
.
UnregisterSession
(
sws
.
sessionID
)
}
// collectChanges collects all the changes that have occurred since the last
// invocation of onChange
func
(
sws
*
sessionWantSender
)
collectChanges
(
changes
[]
change
)
[]
change
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment