basic.go 3.57 KB
Newer Older
Łukasz Magiera's avatar
Łukasz Magiera committed
1 2 3 4 5 6 7
package event

import (
	"errors"
	"fmt"
	"reflect"
	"sync"
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
8
	"sync/atomic"
Łukasz Magiera's avatar
Łukasz Magiera committed
9 10 11 12 13
)

///////////////////////
// BUS

Łukasz Magiera's avatar
Łukasz Magiera committed
14
type basicBus struct {
Łukasz Magiera's avatar
Gofmt  
Łukasz Magiera committed
15
	lk    sync.Mutex
Łukasz Magiera's avatar
Łukasz Magiera committed
16
	nodes map[reflect.Type]*node
Łukasz Magiera's avatar
Łukasz Magiera committed
17 18
}

Łukasz Magiera's avatar
Łukasz Magiera committed
19 20
func NewBus() *basicBus {
	return &basicBus{
Łukasz Magiera's avatar
Łukasz Magiera committed
21
		nodes: map[reflect.Type]*node{},
Łukasz Magiera's avatar
Łukasz Magiera committed
22 23 24
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
25
func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node)) error {
Łukasz Magiera's avatar
Łukasz Magiera committed
26 27
	b.lk.Lock()

Łukasz Magiera's avatar
Łukasz Magiera committed
28
	n, ok := b.nodes[typ]
Łukasz Magiera's avatar
Łukasz Magiera committed
29 30
	if !ok {
		n = newNode(typ)
Łukasz Magiera's avatar
Łukasz Magiera committed
31
		b.nodes[typ] = n
Łukasz Magiera's avatar
Łukasz Magiera committed
32 33 34 35
	}

	n.lk.Lock()
	b.lk.Unlock()
Łukasz Magiera's avatar
Łukasz Magiera committed
36

Łukasz Magiera's avatar
Łukasz Magiera committed
37
	cb(n)
Łukasz Magiera's avatar
Łukasz Magiera committed
38 39 40 41 42

	go func() {
		defer n.lk.Unlock()
		async(n)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
43

Łukasz Magiera's avatar
Łukasz Magiera committed
44 45 46
	return nil
}

Łukasz Magiera's avatar
Łukasz Magiera committed
47
func (b *basicBus) tryDropNode(typ reflect.Type) {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
48
	b.lk.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
49
	n, ok := b.nodes[typ]
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
50 51 52 53 54 55
	if !ok { // already dropped
		b.lk.Unlock()
		return
	}

	n.lk.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
56
	if atomic.LoadInt32(&n.nEmitters) > 0 || len(n.sinks) > 0 {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
57 58 59 60 61 62
		n.lk.Unlock()
		b.lk.Unlock()
		return // still in use
	}
	n.lk.Unlock()

Łukasz Magiera's avatar
Łukasz Magiera committed
63
	delete(b.nodes, typ)
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
64 65 66
	b.lk.Unlock()
}

Łukasz Magiera's avatar
Łukasz Magiera committed
67
func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
68
	var settings subSettings
Łukasz Magiera's avatar
Łukasz Magiera committed
69 70 71 72 73 74
	for _, opt := range opts {
		if err := opt(&settings); err != nil {
			return nil, err
		}
	}

75 76 77 78 79
	refCh := reflect.ValueOf(typedChan)
	typ := refCh.Type()
	if typ.Kind() != reflect.Chan {
		return nil, errors.New("expected a channel")
	}
Łukasz Magiera's avatar
Gofmt  
Łukasz Magiera committed
80
	if typ.ChanDir()&reflect.SendDir == 0 {
81 82 83
		return nil, errors.New("channel doesn't allow send")
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
84 85 86 87 88 89 90
	if settings.forcedType != nil {
		if settings.forcedType.Elem().AssignableTo(typ) {
			return nil, fmt.Errorf("forced type %s cannot be sent to chan %s", settings.forcedType, typ)
		}
		typ = settings.forcedType
	}

91
	err = b.withNode(typ.Elem(), func(n *node) {
Łukasz Magiera's avatar
Łukasz Magiera committed
92
		n.sinks = append(n.sinks, refCh)
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
93 94
		c = func() {
			n.lk.Lock()
Jakub Sztandera's avatar
Jakub Sztandera committed
95 96
			for i := 0; i < len(n.sinks); i++ {
				if n.sinks[i] == refCh {
Łukasz Magiera's avatar
Łukasz Magiera committed
97
					n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], reflect.Value{}
Jakub Sztandera's avatar
Jakub Sztandera committed
98 99 100 101
					n.sinks = n.sinks[:len(n.sinks)-1]
					break
				}
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
102
			tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
103 104
			n.lk.Unlock()
			if tryDrop {
105
				b.tryDropNode(typ.Elem())
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
106 107
			}
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
108 109 110 111 112 113 114 115 116
	}, func(n *node) {
		if n.keepLast {
			lastVal, ok := n.last.Load().(reflect.Value)
			if !ok {
				return
			}

			refCh.Send(lastVal)
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
117 118 119 120
	})
	return
}

Łukasz Magiera's avatar
Łukasz Magiera committed
121
func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFunc, err error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
122
	var settings emitterSettings
Łukasz Magiera's avatar
Łukasz Magiera committed
123 124 125 126
	for _, opt := range opts {
		opt(&settings)
	}

127 128
	typ := reflect.TypeOf(evtType)
	if typ.Kind() != reflect.Ptr {
Łukasz Magiera's avatar
Łukasz Magiera committed
129
		return nil, errors.New("emitter called with non-pointer type")
130 131 132 133
	}
	typ = typ.Elem()

	err = b.withNode(typ, func(n *node) {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
134 135
		atomic.AddInt32(&n.nEmitters, 1)
		closed := false
Łukasz Magiera's avatar
Łukasz Magiera committed
136
		n.keepLast = n.keepLast || settings.makeStateful
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
137 138 139 140 141

		e = func(event interface{}) {
			if closed {
				panic("emitter is closed")
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
142 143 144 145 146 147
			if event == closeEmit {
				closed = true
				if atomic.AddInt32(&n.nEmitters, -1) == 0 {
					b.tryDropNode(typ)
				}
				return
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
148
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
149
			n.emit(event)
Łukasz Magiera's avatar
Łukasz Magiera committed
150
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
151
	}, func(_ *node) {})
Łukasz Magiera's avatar
Łukasz Magiera committed
152 153 154 155 156 157 158
	return
}

///////////////////////
// NODE

type node struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
159
	// Note: make sure to NEVER lock basicBus.lk when this lock is held
Łukasz Magiera's avatar
Łukasz Magiera committed
160 161 162 163
	lk sync.RWMutex

	typ reflect.Type

Łukasz Magiera's avatar
Łukasz Magiera committed
164
	// emitter ref count
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
165
	nEmitters int32
Łukasz Magiera's avatar
Łukasz Magiera committed
166

Łukasz Magiera's avatar
Gofmt  
Łukasz Magiera committed
167
	keepLast bool
Łukasz Magiera's avatar
Łukasz Magiera committed
168
	last     atomic.Value
Jakub Sztandera's avatar
Jakub Sztandera committed
169 170

	sinks []reflect.Value
Łukasz Magiera's avatar
Łukasz Magiera committed
171 172 173 174 175 176 177 178
}

func newNode(typ reflect.Type) *node {
	return &node{
		typ: typ,
	}
}

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
179
func (n *node) emit(event interface{}) {
180 181 182
	eval := reflect.ValueOf(event)
	if eval.Type() != n.typ {
		panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, eval.Type()))
Łukasz Magiera's avatar
Łukasz Magiera committed
183 184 185
	}

	n.lk.RLock()
Łukasz Magiera's avatar
Łukasz Magiera committed
186 187 188 189
	if n.keepLast {
		n.last.Store(eval)
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
190
	for _, ch := range n.sinks {
191
		ch.Send(eval)
Łukasz Magiera's avatar
Łukasz Magiera committed
192
	}
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
193
	n.lk.RUnlock()
Łukasz Magiera's avatar
Łukasz Magiera committed
194 195 196 197 198
}

///////////////////////
// UTILS

Łukasz Magiera's avatar
Łukasz Magiera committed
199
var _ Bus = &basicBus{}