Commit bd8289f8 authored by Raúl Kripalani's avatar Raúl Kripalani

make Emitter an interface.

parent 1ab671b0
......@@ -6,18 +6,48 @@ import (
"reflect"
"sync"
"sync/atomic"
"github.com/libp2p/go-libp2p-core/event"
)
///////////////////////
// BUS
type CancelFunc = func()
// basicBus is a type-based event delivery system
type basicBus struct {
lk sync.Mutex
nodes map[reflect.Type]*node
}
func NewBus() *basicBus {
var _ event.Bus = (*basicBus)(nil)
type Emitter struct {
n *node
typ reflect.Type
closed int32
dropper func(reflect.Type)
}
func (e *Emitter) Emit(evt interface{}) {
if atomic.LoadInt32(&e.closed) != 0 {
panic("emitter is closed")
}
e.n.emit(evt)
}
func (e *Emitter) Close() error {
if !atomic.CompareAndSwapInt32(&e.closed, 0, 1) {
panic("closed an emitter more than once")
}
if atomic.AddInt32(&e.n.nEmitters, -1) == 0 {
e.dropper(e.typ)
}
return nil
}
func NewBus() event.Bus {
return &basicBus{
nodes: map[reflect.Type]*node{},
}
......@@ -74,7 +104,7 @@ func (b *basicBus) tryDropNode(typ reflect.Type) {
// defer close(ch)
// cancel, err := eventbus.Subscribe(ch)
// defer cancel()
func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) {
func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubscriptionOpt) (c CancelFunc, err error) {
var settings subSettings
for _, opt := range opts {
if err := opt(&settings); err != nil {
......@@ -138,10 +168,12 @@ func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c Cancel
// defer emit.Close() // MUST call this after being done with the emitter
//
// emit(EventT{})
func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFunc, err error) {
func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOpt) (e event.Emitter, err error) {
var settings emitterSettings
for _, opt := range opts {
opt(&settings)
if err := opt(&settings); err != nil {
return nil, err
}
}
typ := reflect.TypeOf(evtType)
......@@ -152,22 +184,8 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFu
err = b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1)
closed := false
n.keepLast = n.keepLast || settings.makeStateful
e = func(event interface{}) {
if closed {
panic("emitter is closed")
}
if event == closeEmit {
closed = true
if atomic.AddInt32(&n.nEmitters, -1) == 0 {
b.tryDropNode(typ)
}
return
}
n.emit(event)
}
e = &Emitter{n: n, typ: typ, dropper: b.tryDropNode}
}, func(_ *node) {})
return
}
......@@ -212,20 +230,3 @@ func (n *node) emit(event interface{}) {
}
n.lk.RUnlock()
}
///////////////////////
// TYPES
var closeEmit struct{}
// EmitFunc emits events. If any channel subscribed to the topic is blocked,
// calls to EmitFunc will block
//
// Calling this function with wrong event type will cause a panic
type EmitFunc func(event interface{})
func (f EmitFunc) Close() {
f(closeEmit)
}
type CancelFunc func()
......@@ -38,13 +38,13 @@ func TestEmit(t *testing.T) {
<-events
}()
emit, err := bus.Emitter(new(EventA))
em, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
emit(EventA{})
em.Emit(EventA{})
}
func TestSub(t *testing.T) {
......@@ -66,13 +66,13 @@ func TestSub(t *testing.T) {
wait.Done()
}()
emit, err := bus.Emitter(new(EventB))
em, err := bus.Emitter(new(EventB))
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
emit(EventB(7))
em.Emit(EventB(7))
wait.Wait()
if event != 7 {
......@@ -83,23 +83,23 @@ func TestSub(t *testing.T) {
func TestEmitNoSubNoBlock(t *testing.T) {
bus := NewBus()
emit, err := bus.Emitter(new(EventA))
em, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
emit(EventA{})
em.Emit(EventA{})
}
func TestEmitOnClosed(t *testing.T) {
bus := NewBus()
emit, err := bus.Emitter(new(EventA))
em, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
emit.Close()
em.Close()
defer func() {
r := recover()
......@@ -111,7 +111,7 @@ func TestEmitOnClosed(t *testing.T) {
}
}()
emit(EventA{})
em.Emit(EventA{})
}
func TestClosingRaces(t *testing.T) {
......@@ -187,15 +187,15 @@ func TestSubMany(t *testing.T) {
}()
}
emit, err := bus.Emitter(new(EventB))
em, err := bus.Emitter(new(EventB))
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
ready.Wait()
emit(EventB(7))
em.Emit(EventB(7))
wait.Wait()
if int(r) != 7*n {
......@@ -222,13 +222,13 @@ func TestSubType(t *testing.T) {
wait.Done()
}()
emit, err := bus.Emitter(new(EventA))
em, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
emit(EventA{})
em.Emit(EventA{})
wait.Wait()
if event.String() != "Oh, Hello" {
......@@ -238,11 +238,11 @@ func TestSubType(t *testing.T) {
func TestNonStateful(t *testing.T) {
bus := NewBus()
emit, err := bus.Emitter(new(EventB))
em, err := bus.Emitter(new(EventB))
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
eventsA := make(chan EventB, 1)
cancelS, err := bus.Subscribe(eventsA)
......@@ -257,7 +257,7 @@ func TestNonStateful(t *testing.T) {
default:
}
emit(EventB(1))
em.Emit(EventB(1))
select {
case e := <-eventsA:
......@@ -284,13 +284,13 @@ func TestNonStateful(t *testing.T) {
func TestStateful(t *testing.T) {
bus := NewBus()
emit, err := bus.Emitter(new(EventB), Stateful)
em, err := bus.Emitter(new(EventB), Stateful)
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
emit(EventB(2))
em.Emit(EventB(2))
eventsA := make(chan EventB, 1)
cancelS, err := bus.Subscribe(eventsA)
......@@ -337,19 +337,19 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {
for i := 0; i < emits; i++ {
go func() {
emit, err := bus.Emitter(new(EventB), func(settings interface{}) error {
em, err := bus.Emitter(new(EventB), func(settings interface{}) error {
settings.(*emitterSettings).makeStateful = stateful
return nil
})
if err != nil {
panic(err)
}
defer emit.Close()
defer em.Close()
ready.Wait()
for i := 0; i < msgs; i++ {
emit(EventB(97))
em.Emit(EventB(97))
}
wait.Done()
......
......@@ -2,4 +2,7 @@ module github.com/libp2p/go-eventbus
go 1.12
require github.com/jbenet/go-detect-race v0.0.0-20150302022421-3463798d9574
require (
github.com/jbenet/go-detect-race v0.0.0-20150302022421-3463798d9574
github.com/libp2p/go-libp2p-core v0.0.4-0.20190619144953-3605e610067d
)
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c h1:aEbSeNALREWXk0G7UdNhR3ayBV7tZ4M2PNmnrCAph6Q=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495 h1:6IyqGr3fnd0tM3YxipK27TUskaOVUjU2nG45yzwcQKY=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/go-detect-race v0.0.0-20150302022421-3463798d9574 h1:Pxjl8Wn3cCU7nB/MCmPEUMbjMHxXFqODW6rce0jpxB4=
github.com/jbenet/go-detect-race v0.0.0-20150302022421-3463798d9574/go.mod h1:gynVu6LUw+xMXD3XEvjHQcIbJkWEamnGjJDebRHqTd0=
github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10=
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-libp2p-core v0.0.4-0.20190619144953-3605e610067d h1:x4SH+JE2mvT8g7vZDIX3MSMhulDwHiOOrUI9ZGU+Nfg=
github.com/libp2p/go-libp2p-core v0.0.4-0.20190619144953-3605e610067d/go.mod h1:j+YQMNz9WNSkNezXOsahp9kwZBKBvxLpKD316QWSJXE=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/minio/sha256-simd v0.1.0 h1:U41/2erhAKcmSI14xh/ZTUdBPOzDOIfS93ibzUSl8KM=
github.com/minio/sha256-simd v0.1.0/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78=
github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.4 h1:WgMSI84/eRLdbptXMkMWDXPjPq7SPLIgGUVm2eroyU4=
github.com/multiformats/go-multiaddr v0.0.4/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.5 h1:1wxmCvTXAifAepIMyF39vZinRw5sbqjPs/UIi93+uik=
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a h1:/eS3yfGjQKG+9kayBkj0ip1BGhq6zJ3eaVksphxAaek=
github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 h1:8dUaAV7K4uHsF56JQWkprecIQKdPHtR9jCHF5nB8uzc=
golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
......@@ -5,12 +5,14 @@ import (
"reflect"
)
type SubscriptionOpt = func(interface{}) error
type EmitterOpt = func(interface{}) error
type subSettings struct {
forcedType reflect.Type
}
type SubOption func(interface{}) error
// ForceSubType is a Subscribe option which overrides the type to which
// the subscription will be done. Note that the evtType must be assignable
// to channel type.
......@@ -27,7 +29,7 @@ type SubOption func(interface{}) error
// eventCh := make(chan fmt.Stringer) // interface { String() string }
// cancel, err := eventbus.Subscribe(eventCh, event.ForceSubType(new(Event)))
// [...]
func ForceSubType(evtType interface{}) SubOption {
func ForceSubType(evtType interface{}) SubscriptionOpt {
return func(settings interface{}) error {
s := settings.(*subSettings)
typ := reflect.TypeOf(evtType)
......@@ -42,7 +44,6 @@ func ForceSubType(evtType interface{}) SubOption {
type emitterSettings struct {
makeStateful bool
}
type EmitterOption func(interface{}) error
// Stateful is an Emitter option which makes makes the eventbus channel
// 'remember' last event sent, and when a new subscriber joins the
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment