Commit 6212a929 authored by Łukasz Magiera's avatar Łukasz Magiera Committed by Raúl Kripalani

refactor interfaces (#9)

parent f100eac4
Pipeline #455 failed with stages
in 0 seconds
......@@ -21,21 +21,21 @@ type basicBus struct {
var _ event.Bus = (*basicBus)(nil)
type Emitter struct {
type emitter struct {
n *node
typ reflect.Type
closed int32
dropper func(reflect.Type)
}
func (e *Emitter) Emit(evt interface{}) {
func (e *emitter) Emit(evt interface{}) {
if atomic.LoadInt32(&e.closed) != 0 {
panic("emitter is closed")
}
e.n.emit(evt)
}
func (e *Emitter) Close() error {
func (e *emitter) Close() error {
if !atomic.CompareAndSwapInt32(&e.closed, 0, 1) {
panic("closed an emitter more than once")
}
......@@ -93,16 +93,42 @@ func (b *basicBus) tryDropNode(typ reflect.Type) {
b.lk.Unlock()
}
type sub struct {
ch chan interface{}
nodes []*node
dropper func(reflect.Type)
}
func (s *sub) Out() <-chan interface{} {
return s.ch
}
func (s *sub) Close() error {
close(s.ch)
for _, n := range s.nodes {
n.lk.Lock()
for i := 0; i < len(n.sinks); i++ {
if n.sinks[i] == s.ch {
n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], nil
n.sinks = n.sinks[:len(n.sinks)-1]
break
}
}
tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0
n.lk.Unlock()
if tryDrop {
s.dropper(n.typ)
}
}
return nil
}
var _ event.Subscription = (*sub)(nil)
// Subscribe creates new subscription. Failing to drain the channel will cause
// publishers to get blocked. CancelFunc is guaranteed to return after last send
// to the channel
//
// Example:
// ch := make(chan EventT, 10)
// defer close(ch)
// cancel, err := eventbus.Subscribe(ch)
// defer cancel()
func (b *basicBus) Subscribe(typedChan interface{}, opts ...event.SubscriptionOpt) (c event.CancelFunc, err error) {
func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt) (_ event.Subscription, err error) {
var settings subSettings
for _, opt := range opts {
if err := opt(&settings); err != nil {
......@@ -110,50 +136,40 @@ func (b *basicBus) Subscribe(typedChan interface{}, opts ...event.SubscriptionOp
}
}
refCh := reflect.ValueOf(typedChan)
typ := refCh.Type()
if typ.Kind() != reflect.Chan {
return nil, errors.New("expected a channel")
types, ok := evtTypes.([]interface{})
if !ok {
types = []interface{}{evtTypes}
}
if typ.ChanDir()&reflect.SendDir == 0 {
return nil, errors.New("channel doesn't allow send")
out := &sub{
ch: make(chan interface{}, settings.buffer),
nodes: make([]*node, len(types)),
dropper: b.tryDropNode,
}
if settings.forcedType != nil {
if settings.forcedType.Elem().AssignableTo(typ) {
return nil, fmt.Errorf("forced type %s cannot be sent to chan %s", settings.forcedType, typ)
for i, etyp := range types {
typ := reflect.TypeOf(etyp)
if typ.Kind() != reflect.Ptr {
return nil, errors.New("subscribe called with non-pointer type")
}
typ = settings.forcedType
}
err = b.withNode(typ.Elem(), func(n *node) {
n.sinks = append(n.sinks, refCh)
c = func() {
n.lk.Lock()
for i := 0; i < len(n.sinks); i++ {
if n.sinks[i] == refCh {
n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], reflect.Value{}
n.sinks = n.sinks[:len(n.sinks)-1]
break
err = b.withNode(typ.Elem(), func(n *node) {
n.sinks = append(n.sinks, out.ch)
out.nodes[i] = n
}, func(n *node) {
if n.keepLast {
l := n.last.Load()
if l == nil {
return
}
out.ch <- l
}
tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0
n.lk.Unlock()
if tryDrop {
b.tryDropNode(typ.Elem())
}
}
}, func(n *node) {
if n.keepLast {
lastVal, ok := n.last.Load().(reflect.Value)
if !ok {
return
}
})
}
refCh.Send(lastVal)
}
})
return
return out, nil
}
// Emitter creates new emitter
......@@ -183,7 +199,7 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve
err = b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1)
n.keepLast = n.keepLast || settings.makeStateful
e = &Emitter{n: n, typ: typ, dropper: b.tryDropNode}
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode}
}, func(_ *node) {})
return
}
......@@ -203,7 +219,7 @@ type node struct {
keepLast bool
last atomic.Value
sinks []reflect.Value
sinks []chan interface{}
}
func newNode(typ reflect.Type) *node {
......@@ -220,11 +236,11 @@ func (n *node) emit(event interface{}) {
n.lk.RLock()
if n.keepLast {
n.last.Store(eval)
n.last.Store(event)
}
for _, ch := range n.sinks {
ch.Send(eval)
ch <- event
}
n.lk.RUnlock()
}
......@@ -27,15 +27,14 @@ func (EventA) String() string {
func TestEmit(t *testing.T) {
bus := NewBus()
events := make(chan EventA)
cancel, err := bus.Subscribe(events)
sub, err := bus.Subscribe(new(EventA))
if err != nil {
t.Fatal(err)
}
go func() {
defer cancel()
<-events
defer sub.Close()
<-sub.Out()
}()
em, err := bus.Emitter(new(EventA))
......@@ -49,8 +48,7 @@ func TestEmit(t *testing.T) {
func TestSub(t *testing.T) {
bus := NewBus()
events := make(chan EventB)
cancel, err := bus.Subscribe(events)
sub, err := bus.Subscribe(new(EventB))
if err != nil {
t.Fatal(err)
}
......@@ -61,8 +59,8 @@ func TestSub(t *testing.T) {
wait.Add(1)
go func() {
defer cancel()
event = <-events
defer sub.Close()
event = (<-sub.Out()).(EventB)
wait.Done()
}()
......@@ -131,9 +129,9 @@ func TestClosingRaces(t *testing.T) {
lk.RLock()
defer lk.RUnlock()
cancel, _ := b.Subscribe(make(chan EventA))
sub, _ := b.Subscribe(new(EventA))
time.Sleep(10 * time.Millisecond)
cancel()
sub.Close()
wg.Done()
}()
......@@ -174,15 +172,14 @@ func TestSubMany(t *testing.T) {
for i := 0; i < n; i++ {
go func() {
events := make(chan EventB)
cancel, err := bus.Subscribe(events)
sub, err := bus.Subscribe(new(EventB))
if err != nil {
panic(err)
}
defer cancel()
defer sub.Close()
ready.Done()
atomic.AddInt32(&r, int32(<-events))
atomic.AddInt32(&r, int32((<-sub.Out()).(EventB)))
wait.Done()
}()
}
......@@ -205,8 +202,7 @@ func TestSubMany(t *testing.T) {
func TestSubType(t *testing.T) {
bus := NewBus()
events := make(chan fmt.Stringer)
cancel, err := bus.Subscribe(events, ForceSubType(new(EventA)))
sub, err := bus.Subscribe([]interface{}{new(EventA), new(EventB)})
if err != nil {
t.Fatal(err)
}
......@@ -217,8 +213,8 @@ func TestSubType(t *testing.T) {
wait.Add(1)
go func() {
defer cancel()
event = <-events
defer sub.Close()
event = (<-sub.Out()).(EventA)
wait.Done()
}()
......@@ -244,15 +240,14 @@ func TestNonStateful(t *testing.T) {
}
defer em.Close()
eventsA := make(chan EventB, 1)
cancelS, err := bus.Subscribe(eventsA)
sub1, err := bus.Subscribe(new(EventB), BufSize(1))
if err != nil {
t.Fatal(err)
}
defer cancelS()
defer sub1.Close()
select {
case <-eventsA:
case <-sub1.Out():
t.Fatal("didn't expect to get an event")
default:
}
......@@ -260,23 +255,22 @@ func TestNonStateful(t *testing.T) {
em.Emit(EventB(1))
select {
case e := <-eventsA:
if e != 1 {
case e := <-sub1.Out():
if e.(EventB) != 1 {
t.Fatal("got wrong event")
}
default:
t.Fatal("expected to get an event")
}
eventsB := make(chan EventB, 1)
cancelS2, err := bus.Subscribe(eventsB)
sub2, err := bus.Subscribe(new(EventB), BufSize(1))
if err != nil {
t.Fatal(err)
}
defer cancelS2()
defer sub2.Close()
select {
case <-eventsA:
case <-sub2.Out():
t.Fatal("didn't expect to get an event")
default:
}
......@@ -292,14 +286,13 @@ func TestStateful(t *testing.T) {
em.Emit(EventB(2))
eventsA := make(chan EventB, 1)
cancelS, err := bus.Subscribe(eventsA)
sub, err := bus.Subscribe(new(EventB), BufSize(1))
if err != nil {
t.Fatal(err)
}
defer cancelS()
defer sub.Close()
if <-eventsA != 2 {
if (<-sub.Out()).(EventB) != 2 {
t.Fatal("got wrong event")
}
}
......@@ -320,16 +313,19 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {
for i := 0; i < subs; i++ {
go func() {
events := make(chan EventB)
cancel, err := bus.Subscribe(events)
sub, err := bus.Subscribe(new(EventB))
if err != nil {
panic(err)
}
defer cancel()
defer sub.Close()
ready.Done()
for i := 0; i < emits*msgs; i++ {
atomic.AddInt64(&r, int64(<-events))
e, ok := <-sub.Out()
if !ok {
panic("wat")
}
atomic.AddInt64(&r, int64(e.(EventB)))
}
wait.Done()
}()
......
......@@ -3,6 +3,6 @@ module github.com/libp2p/go-eventbus
go 1.12
require (
github.com/libp2p/go-libp2p-core v0.0.4
github.com/libp2p/go-libp2p-core v0.0.6
github.com/libp2p/go-libp2p-testing v0.0.4
)
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c h1:aEbSeNALREWXk0G7UdNhR3ayBV7tZ4M2PNmnrCAph6Q=
......@@ -10,15 +12,20 @@ github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVa
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495 h1:6IyqGr3fnd0tM3YxipK27TUskaOVUjU2nG45yzwcQKY=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
......@@ -33,8 +40,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-core v0.0.4 h1:wNg7b2tKrZlSNibP+j7A1v8eatjf4+9+YRw0L3DUeFE=
github.com/libp2p/go-libp2p-core v0.0.4/go.mod h1:jyuCQP356gzfCFtRKyvAbNkyeuxb7OlyhWZ3nls5d2I=
github.com/libp2p/go-libp2p-core v0.0.6 h1:SsYhfWJ47vLP1Rd9/0hqEm/W/PlFbC/3YLZyLCcvo1w=
github.com/libp2p/go-libp2p-core v0.0.6/go.mod h1:0d9xmaYAVY5qmbp/fcgxHT3ZJsLjYeYPMJAUKpaCHrE=
github.com/libp2p/go-libp2p-testing v0.0.4 h1:Qev57UR47GcLPXWjrunv5aLIQGO4n9mhI/8/EIrEEFc=
github.com/libp2p/go-libp2p-testing v0.0.4/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
......@@ -64,6 +71,7 @@ github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
......@@ -71,9 +79,20 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 h1:IcSOAf4PyMp3U3XbIEj1/xJ2BjNN2jWv7JoyOsMxXUU=
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
......@@ -81,7 +100,15 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
package eventbus
import (
"errors"
"reflect"
"github.com/libp2p/go-libp2p-core/event"
)
type subSettings struct {
forcedType reflect.Type
buffer int
}
// ForceSubType is a Subscribe option which overrides the type to which
// the subscription will be done. Note that the evtType must be assignable
// to channel type.
//
// This also allows for subscribing to multiple eventbus channels with one
// Go channel to get better ordering guarantees.
//
// Example:
// type Event struct{}
// func (Event) String() string {
// return "event"
// }
//
// eventCh := make(chan fmt.Stringer) // interface { String() string }
// cancel, err := eventbus.Subscribe(eventCh, event.ForceSubType(new(Event)))
// [...]
func ForceSubType(evtType interface{}) event.SubscriptionOpt {
return func(settings interface{}) error {
s := settings.(*subSettings)
typ := reflect.TypeOf(evtType)
if typ.Kind() != reflect.Ptr {
return errors.New("ForceSubType called with non-pointer type")
}
s.forcedType = typ
func BufSize(n int) func(interface{}) error {
return func(s interface{}) error {
s.(*subSettings).buffer = n
return nil
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment