basic.go 4.47 KB
Newer Older
Łukasz Magiera's avatar
Łukasz Magiera committed
1 2 3 4 5 6 7
package event

import (
	"errors"
	"fmt"
	"reflect"
	"sync"
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
8
	"sync/atomic"
Łukasz Magiera's avatar
Łukasz Magiera committed
9 10 11 12 13
)

///////////////////////
// BUS

Łukasz Magiera's avatar
Łukasz Magiera committed
14
// basicBus is a type-based event delivery system
Łukasz Magiera's avatar
Łukasz Magiera committed
15
type basicBus struct {
Łukasz Magiera's avatar
Gofmt  
Łukasz Magiera committed
16
	lk    sync.Mutex
Łukasz Magiera's avatar
Łukasz Magiera committed
17
	nodes map[reflect.Type]*node
Łukasz Magiera's avatar
Łukasz Magiera committed
18 19
}

Łukasz Magiera's avatar
Łukasz Magiera committed
20 21
func NewBus() *basicBus {
	return &basicBus{
Łukasz Magiera's avatar
Łukasz Magiera committed
22
		nodes: map[reflect.Type]*node{},
Łukasz Magiera's avatar
Łukasz Magiera committed
23 24 25
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
26
func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node)) error {
Łukasz Magiera's avatar
Łukasz Magiera committed
27 28
	b.lk.Lock()

Łukasz Magiera's avatar
Łukasz Magiera committed
29
	n, ok := b.nodes[typ]
Łukasz Magiera's avatar
Łukasz Magiera committed
30 31
	if !ok {
		n = newNode(typ)
Łukasz Magiera's avatar
Łukasz Magiera committed
32
		b.nodes[typ] = n
Łukasz Magiera's avatar
Łukasz Magiera committed
33 34 35 36
	}

	n.lk.Lock()
	b.lk.Unlock()
Łukasz Magiera's avatar
Łukasz Magiera committed
37

Łukasz Magiera's avatar
Łukasz Magiera committed
38
	cb(n)
Łukasz Magiera's avatar
Łukasz Magiera committed
39 40 41 42 43

	go func() {
		defer n.lk.Unlock()
		async(n)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
44

Łukasz Magiera's avatar
Łukasz Magiera committed
45 46 47
	return nil
}

Łukasz Magiera's avatar
Łukasz Magiera committed
48
func (b *basicBus) tryDropNode(typ reflect.Type) {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
49
	b.lk.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
50
	n, ok := b.nodes[typ]
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
51 52 53 54 55 56
	if !ok { // already dropped
		b.lk.Unlock()
		return
	}

	n.lk.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
57
	if atomic.LoadInt32(&n.nEmitters) > 0 || len(n.sinks) > 0 {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
58 59 60 61 62 63
		n.lk.Unlock()
		b.lk.Unlock()
		return // still in use
	}
	n.lk.Unlock()

Łukasz Magiera's avatar
Łukasz Magiera committed
64
	delete(b.nodes, typ)
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
65 66 67
	b.lk.Unlock()
}

Łukasz Magiera's avatar
Łukasz Magiera committed
68 69 70 71 72 73 74 75 76
// Subscribe creates new subscription. Failing to drain the channel will cause
// publishers to get blocked. CancelFunc is guaranteed to return after last send
// to the channel
//
// Example:
// ch := make(chan EventT, 10)
// defer close(ch)
// cancel, err := eventbus.Subscribe(ch)
// defer cancel()
Łukasz Magiera's avatar
Łukasz Magiera committed
77
func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
78
	var settings subSettings
Łukasz Magiera's avatar
Łukasz Magiera committed
79 80 81 82 83 84
	for _, opt := range opts {
		if err := opt(&settings); err != nil {
			return nil, err
		}
	}

85 86 87 88 89
	refCh := reflect.ValueOf(typedChan)
	typ := refCh.Type()
	if typ.Kind() != reflect.Chan {
		return nil, errors.New("expected a channel")
	}
Łukasz Magiera's avatar
Gofmt  
Łukasz Magiera committed
90
	if typ.ChanDir()&reflect.SendDir == 0 {
91 92 93
		return nil, errors.New("channel doesn't allow send")
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
94 95 96 97 98 99 100
	if settings.forcedType != nil {
		if settings.forcedType.Elem().AssignableTo(typ) {
			return nil, fmt.Errorf("forced type %s cannot be sent to chan %s", settings.forcedType, typ)
		}
		typ = settings.forcedType
	}

101
	err = b.withNode(typ.Elem(), func(n *node) {
Łukasz Magiera's avatar
Łukasz Magiera committed
102
		n.sinks = append(n.sinks, refCh)
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
103 104
		c = func() {
			n.lk.Lock()
Jakub Sztandera's avatar
Jakub Sztandera committed
105 106
			for i := 0; i < len(n.sinks); i++ {
				if n.sinks[i] == refCh {
Łukasz Magiera's avatar
Łukasz Magiera committed
107
					n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], reflect.Value{}
Jakub Sztandera's avatar
Jakub Sztandera committed
108 109 110 111
					n.sinks = n.sinks[:len(n.sinks)-1]
					break
				}
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
112
			tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
113 114
			n.lk.Unlock()
			if tryDrop {
115
				b.tryDropNode(typ.Elem())
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
116 117
			}
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
118 119 120 121 122 123 124 125 126
	}, func(n *node) {
		if n.keepLast {
			lastVal, ok := n.last.Load().(reflect.Value)
			if !ok {
				return
			}

			refCh.Send(lastVal)
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
127 128 129 130
	})
	return
}

Łukasz Magiera's avatar
Łukasz Magiera committed
131 132 133 134 135 136 137 138 139 140
// Emitter creates new emitter
//
// eventType accepts typed nil pointers, and uses the type information to
// select output type
//
// Example:
// emit, err := eventbus.Emitter(new(EventT))
// defer emit.Close() // MUST call this after being done with the emitter
//
// emit(EventT{})
Łukasz Magiera's avatar
Łukasz Magiera committed
141
func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFunc, err error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
142
	var settings emitterSettings
Łukasz Magiera's avatar
Łukasz Magiera committed
143 144 145 146
	for _, opt := range opts {
		opt(&settings)
	}

147 148
	typ := reflect.TypeOf(evtType)
	if typ.Kind() != reflect.Ptr {
Łukasz Magiera's avatar
Łukasz Magiera committed
149
		return nil, errors.New("emitter called with non-pointer type")
150 151 152 153
	}
	typ = typ.Elem()

	err = b.withNode(typ, func(n *node) {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
154 155
		atomic.AddInt32(&n.nEmitters, 1)
		closed := false
Łukasz Magiera's avatar
Łukasz Magiera committed
156
		n.keepLast = n.keepLast || settings.makeStateful
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
157 158 159 160 161

		e = func(event interface{}) {
			if closed {
				panic("emitter is closed")
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
162 163 164 165 166 167
			if event == closeEmit {
				closed = true
				if atomic.AddInt32(&n.nEmitters, -1) == 0 {
					b.tryDropNode(typ)
				}
				return
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
168
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
169
			n.emit(event)
Łukasz Magiera's avatar
Łukasz Magiera committed
170
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
171
	}, func(_ *node) {})
Łukasz Magiera's avatar
Łukasz Magiera committed
172 173 174 175 176 177 178
	return
}

///////////////////////
// NODE

type node struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
179
	// Note: make sure to NEVER lock basicBus.lk when this lock is held
Łukasz Magiera's avatar
Łukasz Magiera committed
180 181 182 183
	lk sync.RWMutex

	typ reflect.Type

Łukasz Magiera's avatar
Łukasz Magiera committed
184
	// emitter ref count
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
185
	nEmitters int32
Łukasz Magiera's avatar
Łukasz Magiera committed
186

Łukasz Magiera's avatar
Gofmt  
Łukasz Magiera committed
187
	keepLast bool
Łukasz Magiera's avatar
Łukasz Magiera committed
188
	last     atomic.Value
Jakub Sztandera's avatar
Jakub Sztandera committed
189 190

	sinks []reflect.Value
Łukasz Magiera's avatar
Łukasz Magiera committed
191 192 193 194 195 196 197 198
}

func newNode(typ reflect.Type) *node {
	return &node{
		typ: typ,
	}
}

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
199
func (n *node) emit(event interface{}) {
200 201 202
	eval := reflect.ValueOf(event)
	if eval.Type() != n.typ {
		panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, eval.Type()))
Łukasz Magiera's avatar
Łukasz Magiera committed
203 204 205
	}

	n.lk.RLock()
Łukasz Magiera's avatar
Łukasz Magiera committed
206 207 208 209
	if n.keepLast {
		n.last.Store(eval)
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
210
	for _, ch := range n.sinks {
211
		ch.Send(eval)
Łukasz Magiera's avatar
Łukasz Magiera committed
212
	}
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
213
	n.lk.RUnlock()
Łukasz Magiera's avatar
Łukasz Magiera committed
214 215 216
}

///////////////////////
Łukasz Magiera's avatar
Łukasz Magiera committed
217
// TYPES
Łukasz Magiera's avatar
Łukasz Magiera committed
218

Łukasz Magiera's avatar
Łukasz Magiera committed
219 220 221 222 223 224 225 226 227 228 229 230 231
var closeEmit struct{}

// EmitFunc emits events. If any channel subscribed to the topic is blocked,
// calls to EmitFunc will block
//
// Calling this function with wrong event type will cause a panic
type EmitFunc func(event interface{})

func (f EmitFunc) Close() {
	f(closeEmit)
}

type CancelFunc func()