Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-eventbus
Commits
df5be7d7
Unverified
Commit
df5be7d7
authored
Jun 27, 2019
by
Steven Allen
Committed by
GitHub
Jun 27, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #11 from libp2p/fix/things
Fix close deadlock and Sub type error
parents
0c299185
454cbe54
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
66 additions
and
5 deletions
+66
-5
basic.go
basic.go
+23
-5
basic_test.go
basic_test.go
+43
-0
No files found.
basic.go
View file @
df5be7d7
...
...
@@ -104,9 +104,21 @@ func (s *sub) Out() <-chan interface{} {
}
func
(
s
*
sub
)
Close
()
error
{
close
(
s
.
ch
)
stop
:=
make
(
chan
struct
{})
go
func
()
{
for
{
select
{
case
<-
s
.
ch
:
case
<-
stop
:
close
(
s
.
ch
)
return
}
}
}()
for
_
,
n
:=
range
s
.
nodes
{
n
.
lk
.
Lock
()
for
i
:=
0
;
i
<
len
(
n
.
sinks
);
i
++
{
if
n
.
sinks
[
i
]
==
s
.
ch
{
n
.
sinks
[
i
],
n
.
sinks
[
len
(
n
.
sinks
)
-
1
]
=
n
.
sinks
[
len
(
n
.
sinks
)
-
1
],
nil
...
...
@@ -114,12 +126,16 @@ func (s *sub) Close() error {
break
}
}
tryDrop
:=
len
(
n
.
sinks
)
==
0
&&
atomic
.
LoadInt32
(
&
n
.
nEmitters
)
==
0
n
.
lk
.
Unlock
()
if
tryDrop
{
s
.
dropper
(
n
.
typ
)
}
}
close
(
stop
)
return
nil
}
...
...
@@ -148,12 +164,14 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt
dropper
:
b
.
tryDropNode
,
}
for
i
,
etyp
:=
range
types
{
typ
:=
reflect
.
TypeOf
(
etyp
)
if
typ
.
Kind
()
!=
reflect
.
Ptr
{
for
_
,
etyp
:=
range
types
{
if
reflect
.
TypeOf
(
etyp
)
.
Kind
()
!=
reflect
.
Ptr
{
return
nil
,
errors
.
New
(
"subscribe called with non-pointer type"
)
}
}
for
i
,
etyp
:=
range
types
{
typ
:=
reflect
.
TypeOf
(
etyp
)
err
=
b
.
withNode
(
typ
.
Elem
(),
func
(
n
*
node
)
{
n
.
sinks
=
append
(
n
.
sinks
,
out
.
ch
)
...
...
basic_test.go
View file @
df5be7d7
...
...
@@ -308,6 +308,49 @@ func TestStateful(t *testing.T) {
}
}
func
TestCloseBlocking
(
t
*
testing
.
T
)
{
bus
:=
NewBus
()
em
,
err
:=
bus
.
Emitter
(
new
(
EventB
))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
sub
,
err
:=
bus
.
Subscribe
(
new
(
EventB
))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
go
func
()
{
em
.
Emit
(
EventB
(
159
))
}()
time
.
Sleep
(
10
*
time
.
Millisecond
)
// make sure that emit is blocked
sub
.
Close
()
// cancel sub
}
func
panicOnTimeout
(
d
time
.
Duration
)
{
<-
time
.
After
(
d
)
panic
(
"timeout reached"
)
}
func
TestSubFailFully
(
t
*
testing
.
T
)
{
bus
:=
NewBus
()
em
,
err
:=
bus
.
Emitter
(
new
(
EventB
))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
_
,
err
=
bus
.
Subscribe
([]
interface
{}{
new
(
EventB
),
5
})
if
err
==
nil
||
err
.
Error
()
!=
"subscribe called with non-pointer type"
{
t
.
Fatal
(
err
)
}
go
panicOnTimeout
(
5
*
time
.
Second
)
em
.
Emit
(
EventB
(
159
))
// will hang if sub doesn't fail properly
}
func
testMany
(
t
testing
.
TB
,
subs
,
emits
,
msgs
int
,
stateful
bool
)
{
if
race
.
WithRace
()
&&
subs
+
emits
>
5000
{
t
.
SkipNow
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment