Unverified Commit 4afad1f6 authored by Łukasz Magiera's avatar Łukasz Magiera Committed by GitHub

Merge pull request #17 from libp2p/fix/with-node

nit: fix with-node
parents d88ec38b 7b280b5c
Pipeline #456 failed with stages
in 0 seconds
......@@ -51,7 +51,7 @@ func NewBus() event.Bus {
}
}
func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node)) error {
func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node)) {
b.lk.Lock()
n, ok := b.nodes[typ]
......@@ -65,12 +65,14 @@ func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node))
cb(n)
go func() {
defer n.lk.Unlock()
async(n)
}()
return nil
if async == nil {
n.lk.Unlock()
} else {
go func() {
defer n.lk.Unlock()
async(n)
}()
}
}
func (b *basicBus) tryDropNode(typ reflect.Type) {
......@@ -168,7 +170,7 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt
for i, etyp := range types {
typ := reflect.TypeOf(etyp)
err = b.withNode(typ.Elem(), func(n *node) {
b.withNode(typ.Elem(), func(n *node) {
n.sinks = append(n.sinks, out.ch)
out.nodes[i] = n
}, func(n *node) {
......@@ -210,11 +212,11 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve
}
typ = typ.Elem()
err = b.withNode(typ, func(n *node) {
b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1)
n.keepLast = n.keepLast || settings.makeStateful
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode}
}, func(_ *node) {})
}, nil)
return
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment