basic.go 4.57 KB
Newer Older
1
package eventbus
Łukasz Magiera's avatar
Łukasz Magiera committed
2 3 4 5 6 7

import (
	"errors"
	"fmt"
	"reflect"
	"sync"
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
8
	"sync/atomic"
Raúl Kripalani's avatar
Raúl Kripalani committed
9 10

	"github.com/libp2p/go-libp2p-core/event"
Łukasz Magiera's avatar
Łukasz Magiera committed
11 12 13 14 15
)

///////////////////////
// BUS

Łukasz Magiera's avatar
Łukasz Magiera committed
16
// basicBus is a type-based event delivery system
Łukasz Magiera's avatar
Łukasz Magiera committed
17
type basicBus struct {
Łukasz Magiera's avatar
Gofmt  
Łukasz Magiera committed
18
	lk    sync.Mutex
Łukasz Magiera's avatar
Łukasz Magiera committed
19
	nodes map[reflect.Type]*node
Łukasz Magiera's avatar
Łukasz Magiera committed
20 21
}

Raúl Kripalani's avatar
Raúl Kripalani committed
22 23
var _ event.Bus = (*basicBus)(nil)

Łukasz Magiera's avatar
Łukasz Magiera committed
24
type emitter struct {
Raúl Kripalani's avatar
Raúl Kripalani committed
25 26 27 28 29 30
	n       *node
	typ     reflect.Type
	closed  int32
	dropper func(reflect.Type)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
31
func (e *emitter) Emit(evt interface{}) {
Raúl Kripalani's avatar
Raúl Kripalani committed
32 33 34 35 36 37
	if atomic.LoadInt32(&e.closed) != 0 {
		panic("emitter is closed")
	}
	e.n.emit(evt)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
38
func (e *emitter) Close() error {
Raúl Kripalani's avatar
Raúl Kripalani committed
39 40 41 42 43 44 45 46 47 48
	if !atomic.CompareAndSwapInt32(&e.closed, 0, 1) {
		panic("closed an emitter more than once")
	}
	if atomic.AddInt32(&e.n.nEmitters, -1) == 0 {
		e.dropper(e.typ)
	}
	return nil
}

func NewBus() event.Bus {
Łukasz Magiera's avatar
Łukasz Magiera committed
49
	return &basicBus{
Łukasz Magiera's avatar
Łukasz Magiera committed
50
		nodes: map[reflect.Type]*node{},
Łukasz Magiera's avatar
Łukasz Magiera committed
51 52 53
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
54
func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node)) error {
Łukasz Magiera's avatar
Łukasz Magiera committed
55 56
	b.lk.Lock()

Łukasz Magiera's avatar
Łukasz Magiera committed
57
	n, ok := b.nodes[typ]
Łukasz Magiera's avatar
Łukasz Magiera committed
58 59
	if !ok {
		n = newNode(typ)
Łukasz Magiera's avatar
Łukasz Magiera committed
60
		b.nodes[typ] = n
Łukasz Magiera's avatar
Łukasz Magiera committed
61 62 63 64
	}

	n.lk.Lock()
	b.lk.Unlock()
Łukasz Magiera's avatar
Łukasz Magiera committed
65

Łukasz Magiera's avatar
Łukasz Magiera committed
66
	cb(n)
Łukasz Magiera's avatar
Łukasz Magiera committed
67 68 69 70 71

	go func() {
		defer n.lk.Unlock()
		async(n)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
72

Łukasz Magiera's avatar
Łukasz Magiera committed
73 74 75
	return nil
}

Łukasz Magiera's avatar
Łukasz Magiera committed
76
func (b *basicBus) tryDropNode(typ reflect.Type) {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
77
	b.lk.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
78
	n, ok := b.nodes[typ]
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
79 80 81 82 83 84
	if !ok { // already dropped
		b.lk.Unlock()
		return
	}

	n.lk.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
85
	if atomic.LoadInt32(&n.nEmitters) > 0 || len(n.sinks) > 0 {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
86 87 88 89 90 91
		n.lk.Unlock()
		b.lk.Unlock()
		return // still in use
	}
	n.lk.Unlock()

Łukasz Magiera's avatar
Łukasz Magiera committed
92
	delete(b.nodes, typ)
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
93 94 95
	b.lk.Unlock()
}

Łukasz Magiera's avatar
Łukasz Magiera committed
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
type sub struct {
	ch      chan interface{}
	nodes   []*node
	dropper func(reflect.Type)
}

func (s *sub) Out() <-chan interface{} {
	return s.ch
}

func (s *sub) Close() error {
	close(s.ch)
	for _, n := range s.nodes {
		n.lk.Lock()
		for i := 0; i < len(n.sinks); i++ {
			if n.sinks[i] == s.ch {
				n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], nil
				n.sinks = n.sinks[:len(n.sinks)-1]
				break
			}
		}
		tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0
		n.lk.Unlock()
		if tryDrop {
			s.dropper(n.typ)
		}
	}
	return nil
}

var _ event.Subscription = (*sub)(nil)

Łukasz Magiera's avatar
Łukasz Magiera committed
128 129 130
// Subscribe creates new subscription. Failing to drain the channel will cause
// publishers to get blocked. CancelFunc is guaranteed to return after last send
// to the channel
Łukasz Magiera's avatar
Łukasz Magiera committed
131
func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt) (_ event.Subscription, err error) {
Jakub Sztandera's avatar
Jakub Sztandera committed
132
	settings := subSettings(subSettingsDefault)
Łukasz Magiera's avatar
Łukasz Magiera committed
133 134 135 136 137 138
	for _, opt := range opts {
		if err := opt(&settings); err != nil {
			return nil, err
		}
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
139 140 141
	types, ok := evtTypes.([]interface{})
	if !ok {
		types = []interface{}{evtTypes}
142
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
143 144 145 146 147 148

	out := &sub{
		ch:    make(chan interface{}, settings.buffer),
		nodes: make([]*node, len(types)),

		dropper: b.tryDropNode,
149 150
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
151 152 153 154 155
	for i, etyp := range types {
		typ := reflect.TypeOf(etyp)

		if typ.Kind() != reflect.Ptr {
			return nil, errors.New("subscribe called with non-pointer type")
Łukasz Magiera's avatar
Łukasz Magiera committed
156
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
157 158 159 160 161 162 163 164 165

		err = b.withNode(typ.Elem(), func(n *node) {
			n.sinks = append(n.sinks, out.ch)
			out.nodes[i] = n
		}, func(n *node) {
			if n.keepLast {
				l := n.last.Load()
				if l == nil {
					return
Jakub Sztandera's avatar
Jakub Sztandera committed
166
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
167
				out.ch <- l
Jakub Sztandera's avatar
Jakub Sztandera committed
168
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
169 170
		})
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
171

Łukasz Magiera's avatar
Łukasz Magiera committed
172
	return out, nil
Łukasz Magiera's avatar
Łukasz Magiera committed
173 174
}

Łukasz Magiera's avatar
Łukasz Magiera committed
175 176 177 178 179 180 181 182 183 184
// Emitter creates new emitter
//
// eventType accepts typed nil pointers, and uses the type information to
// select output type
//
// Example:
// emit, err := eventbus.Emitter(new(EventT))
// defer emit.Close() // MUST call this after being done with the emitter
//
// emit(EventT{})
Raúl Kripalani's avatar
Raúl Kripalani committed
185
func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e event.Emitter, err error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
186
	var settings emitterSettings
Jakub Sztandera's avatar
Jakub Sztandera committed
187

Łukasz Magiera's avatar
Łukasz Magiera committed
188
	for _, opt := range opts {
Raúl Kripalani's avatar
Raúl Kripalani committed
189 190 191
		if err := opt(&settings); err != nil {
			return nil, err
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
192 193
	}

194 195
	typ := reflect.TypeOf(evtType)
	if typ.Kind() != reflect.Ptr {
Łukasz Magiera's avatar
Łukasz Magiera committed
196
		return nil, errors.New("emitter called with non-pointer type")
197 198 199 200
	}
	typ = typ.Elem()

	err = b.withNode(typ, func(n *node) {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
201
		atomic.AddInt32(&n.nEmitters, 1)
Łukasz Magiera's avatar
Łukasz Magiera committed
202
		n.keepLast = n.keepLast || settings.makeStateful
Łukasz Magiera's avatar
Łukasz Magiera committed
203
		e = &emitter{n: n, typ: typ, dropper: b.tryDropNode}
Łukasz Magiera's avatar
Łukasz Magiera committed
204
	}, func(_ *node) {})
Łukasz Magiera's avatar
Łukasz Magiera committed
205 206 207 208 209 210 211
	return
}

///////////////////////
// NODE

type node struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
212
	// Note: make sure to NEVER lock basicBus.lk when this lock is held
Łukasz Magiera's avatar
Łukasz Magiera committed
213 214 215 216
	lk sync.RWMutex

	typ reflect.Type

Łukasz Magiera's avatar
Łukasz Magiera committed
217
	// emitter ref count
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
218
	nEmitters int32
Łukasz Magiera's avatar
Łukasz Magiera committed
219

Łukasz Magiera's avatar
Gofmt  
Łukasz Magiera committed
220
	keepLast bool
Łukasz Magiera's avatar
Łukasz Magiera committed
221
	last     atomic.Value
Jakub Sztandera's avatar
Jakub Sztandera committed
222

Łukasz Magiera's avatar
Łukasz Magiera committed
223
	sinks []chan interface{}
Łukasz Magiera's avatar
Łukasz Magiera committed
224 225 226 227 228 229 230 231
}

func newNode(typ reflect.Type) *node {
	return &node{
		typ: typ,
	}
}

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
232
func (n *node) emit(event interface{}) {
Steven Allen's avatar
Steven Allen committed
233 234 235
	typ := reflect.TypeOf(event)
	if typ != n.typ {
		panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, typ))
Łukasz Magiera's avatar
Łukasz Magiera committed
236 237 238
	}

	n.lk.RLock()
Łukasz Magiera's avatar
Łukasz Magiera committed
239
	if n.keepLast {
Łukasz Magiera's avatar
Łukasz Magiera committed
240
		n.last.Store(event)
Łukasz Magiera's avatar
Łukasz Magiera committed
241 242
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
243
	for _, ch := range n.sinks {
Łukasz Magiera's avatar
Łukasz Magiera committed
244
		ch <- event
Łukasz Magiera's avatar
Łukasz Magiera committed
245
	}
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
246
	n.lk.RUnlock()
Łukasz Magiera's avatar
Łukasz Magiera committed
247
}