basic.go 3.67 KB
Newer Older
Łukasz Magiera's avatar
Łukasz Magiera committed
1 2 3 4 5 6 7
package event

import (
	"errors"
	"fmt"
	"reflect"
	"sync"
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
8
	"sync/atomic"
Łukasz Magiera's avatar
Łukasz Magiera committed
9 10 11 12 13 14
)

///////////////////////
// BUS

type bus struct {
Łukasz Magiera's avatar
Gofmt  
Łukasz Magiera committed
15
	lk    sync.Mutex
Łukasz Magiera's avatar
Łukasz Magiera committed
16 17 18 19 20 21 22 23 24
	nodes map[string]*node
}

func NewBus() Bus {
	return &bus{
		nodes: map[string]*node{},
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
25
func (b *bus) withNode(typ reflect.Type, cb func(*node), async func(*node)) error {
Łukasz Magiera's avatar
Łukasz Magiera committed
26 27 28 29 30 31 32 33 34 35 36 37
	path := typePath(typ)

	b.lk.Lock()

	n, ok := b.nodes[path]
	if !ok {
		n = newNode(typ)
		b.nodes[path] = n
	}

	n.lk.Lock()
	b.lk.Unlock()
Łukasz Magiera's avatar
Łukasz Magiera committed
38 39 40 41 42 43 44 45 46 47 48 49

	ok = false
	defer func() {
		if !ok {
			n.lk.Unlock()
		}
		go func() {
			defer n.lk.Unlock()
			async(n)
		}()
	}()

Łukasz Magiera's avatar
Łukasz Magiera committed
50
	cb(n)
Łukasz Magiera's avatar
Łukasz Magiera committed
51 52
	ok = true

Łukasz Magiera's avatar
Łukasz Magiera committed
53 54 55
	return nil
}

56 57
func (b *bus) tryDropNode(typ reflect.Type) {
	path := typePath(typ)
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77

	b.lk.Lock()
	n, ok := b.nodes[path]
	if !ok { // already dropped
		b.lk.Unlock()
		return
	}

	n.lk.Lock()
	if n.nEmitters > 0 || len(n.sinks) > 0 {
		n.lk.Unlock()
		b.lk.Unlock()
		return // still in use
	}
	n.lk.Unlock()

	delete(b.nodes, path)
	b.lk.Unlock()
}

Łukasz Magiera's avatar
Łukasz Magiera committed
78 79 80 81 82 83 84 85
func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) {
	var settings SubSettings
	for _, opt := range opts {
		if err := opt(&settings); err != nil {
			return nil, err
		}
	}

86 87 88 89 90
	refCh := reflect.ValueOf(typedChan)
	typ := refCh.Type()
	if typ.Kind() != reflect.Chan {
		return nil, errors.New("expected a channel")
	}
Łukasz Magiera's avatar
Gofmt  
Łukasz Magiera committed
91
	if typ.ChanDir()&reflect.SendDir == 0 {
92 93 94
		return nil, errors.New("channel doesn't allow send")
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
95 96 97 98 99 100 101
	if settings.forcedType != nil {
		if settings.forcedType.Elem().AssignableTo(typ) {
			return nil, fmt.Errorf("forced type %s cannot be sent to chan %s", settings.forcedType, typ)
		}
		typ = settings.forcedType
	}

102
	err = b.withNode(typ.Elem(), func(n *node) {
Łukasz Magiera's avatar
Łukasz Magiera committed
103
		n.sinks = append(n.sinks, refCh)
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
104 105
		c = func() {
			n.lk.Lock()
Jakub Sztandera's avatar
Jakub Sztandera committed
106 107 108 109 110 111 112
			for i := 0; i < len(n.sinks); i++ {
				if n.sinks[i] == refCh {
					n.sinks[i] = n.sinks[len(n.sinks)-1]
					n.sinks = n.sinks[:len(n.sinks)-1]
					break
				}
			}
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
113 114 115
			tryDrop := len(n.sinks) == 0 && n.nEmitters == 0
			n.lk.Unlock()
			if tryDrop {
116
				b.tryDropNode(typ.Elem())
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
117 118
			}
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
119 120 121 122 123 124 125 126 127
	}, func(n *node) {
		if n.keepLast {
			lastVal, ok := n.last.Load().(reflect.Value)
			if !ok {
				return
			}

			refCh.Send(lastVal)
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
128 129 130 131
	})
	return
}

Łukasz Magiera's avatar
Łukasz Magiera committed
132 133 134 135 136 137
func (b *bus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFunc, c CancelFunc, err error) {
	var settings EmitterSettings
	for _, opt := range opts {
		opt(&settings)
	}

138 139 140 141 142 143 144
	typ := reflect.TypeOf(evtType)
	if typ.Kind() != reflect.Ptr {
		return nil, nil, errors.New("emitter called with non-pointer type")
	}
	typ = typ.Elem()

	err = b.withNode(typ, func(n *node) {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
145 146
		atomic.AddInt32(&n.nEmitters, 1)
		closed := false
Łukasz Magiera's avatar
Łukasz Magiera committed
147
		n.keepLast = n.keepLast || settings.makeStateful
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
148 149 150 151 152 153 154 155 156 157 158

		e = func(event interface{}) {
			if closed {
				panic("emitter is closed")
			}
			n.emit(event)
		}

		c = func() {
			closed = true
			if atomic.AddInt32(&n.nEmitters, -1) == 0 {
159
				b.tryDropNode(typ)
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
160
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
161
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
162
	}, func(_ *node) {})
Łukasz Magiera's avatar
Łukasz Magiera committed
163 164 165 166 167 168 169 170 171 172 173 174
	return
}

///////////////////////
// NODE

type node struct {
	// Note: make sure to NEVER lock bus.lk when this lock is held
	lk sync.RWMutex

	typ reflect.Type

Łukasz Magiera's avatar
Łukasz Magiera committed
175
	// emitter ref count
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
176
	nEmitters int32
Łukasz Magiera's avatar
Łukasz Magiera committed
177

Łukasz Magiera's avatar
Gofmt  
Łukasz Magiera committed
178
	keepLast bool
Łukasz Magiera's avatar
Łukasz Magiera committed
179
	last     atomic.Value
Jakub Sztandera's avatar
Jakub Sztandera committed
180 181

	sinks []reflect.Value
Łukasz Magiera's avatar
Łukasz Magiera committed
182 183 184 185 186 187 188 189
}

func newNode(typ reflect.Type) *node {
	return &node{
		typ: typ,
	}
}

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
190
func (n *node) emit(event interface{}) {
191 192 193
	eval := reflect.ValueOf(event)
	if eval.Type() != n.typ {
		panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, eval.Type()))
Łukasz Magiera's avatar
Łukasz Magiera committed
194 195 196
	}

	n.lk.RLock()
Łukasz Magiera's avatar
Łukasz Magiera committed
197 198 199 200
	if n.keepLast {
		n.last.Store(eval)
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
201
	// TODO: try using reflect.Select
Łukasz Magiera's avatar
Łukasz Magiera committed
202
	for _, ch := range n.sinks {
203
		ch.Send(eval)
Łukasz Magiera's avatar
Łukasz Magiera committed
204
	}
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
205
	n.lk.RUnlock()
Łukasz Magiera's avatar
Łukasz Magiera committed
206 207 208 209 210 211 212 213 214 215
}

///////////////////////
// UTILS

func typePath(t reflect.Type) string {
	return t.PkgPath() + "/" + t.String()
}

var _ Bus = &bus{}