basic.go 2.75 KB
Newer Older
Łukasz Magiera's avatar
Łukasz Magiera committed
1 2 3 4 5 6 7
package event

import (
	"errors"
	"fmt"
	"reflect"
	"sync"
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
8
	"sync/atomic"
Łukasz Magiera's avatar
Łukasz Magiera committed
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
)

///////////////////////
// BUS

type bus struct {
	lk sync.Mutex
	nodes map[string]*node
}

func NewBus() Bus {
	return &bus{
		nodes: map[string]*node{},
	}
}

func (b *bus) withNode(evtType interface{}, cb func(*node)) error {
	typ := reflect.TypeOf(evtType)
	if typ.Kind() != reflect.Ptr {
		return errors.New("subscribe called with non-pointer type")
	}
	typ = typ.Elem()
	path := typePath(typ)

	b.lk.Lock()

	n, ok := b.nodes[path]
	if !ok {
		n = newNode(typ)
		b.nodes[path] = n
	}

	n.lk.Lock()
	b.lk.Unlock()
	cb(n)
	n.lk.Unlock()
	return nil
}

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
func (b *bus) tryDropNode(evtType interface{}) {
	path := typePath(reflect.TypeOf(evtType).Elem())

	b.lk.Lock()
	n, ok := b.nodes[path]
	if !ok { // already dropped
		b.lk.Unlock()
		return
	}

	n.lk.Lock()
	if n.nEmitters > 0 || len(n.sinks) > 0 {
		n.lk.Unlock()
		b.lk.Unlock()
		return // still in use
	}
	n.lk.Unlock()

	delete(b.nodes, path)
	b.lk.Unlock()
}

70
func (b *bus) Subscribe(evtType interface{}, _ ...SubOption) (s <-chan interface{}, c CancelFunc, err error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
71
	err = b.withNode(evtType, func(n *node) {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
72 73 74 75 76 77 78 79 80 81 82 83
		out, i := n.sub(0)
		s = out
		c = func() {
			n.lk.Lock()
			delete(n.sinks, i)
			close(out)
			tryDrop := len(n.sinks) == 0 && n.nEmitters == 0
			n.lk.Unlock()
			if tryDrop {
				b.tryDropNode(evtType)
			}
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
84 85 86 87
	})
	return
}

88
func (b *bus) Emitter(evtType interface{}, _ ...EmitterOption) (e EmitFunc, c CancelFunc, err error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
89
	err = b.withNode(evtType, func(n *node) {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
		atomic.AddInt32(&n.nEmitters, 1)
		closed := false

		e = func(event interface{}) {
			if closed {
				panic("emitter is closed")
			}
			n.emit(event)
		}

		c = func() {
			closed = true
			if atomic.AddInt32(&n.nEmitters, -1) == 0 {
				b.tryDropNode(evtType)
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
105 106 107 108 109 110 111 112 113 114 115 116 117 118
		}
	})
	return
}

///////////////////////
// NODE

type node struct {
	// Note: make sure to NEVER lock bus.lk when this lock is held
	lk sync.RWMutex

	typ reflect.Type

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
119 120
	nEmitters int32
	nSinks int
121 122 123

	// TODO: we could make emit a bit faster by making this into an array, but
	//  it doesn't seem needed for now
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
124
	sinks  map[int]chan interface{}
Łukasz Magiera's avatar
Łukasz Magiera committed
125 126 127 128 129 130 131 132 133 134
}

func newNode(typ reflect.Type) *node {
	return &node{
		typ: typ,

		sinks: map[int]chan interface{}{},
	}
}

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
135
func (n *node) sub(buf int) (chan interface{}, int) {
Łukasz Magiera's avatar
Łukasz Magiera committed
136
	out := make(chan interface{}, buf)
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
137 138 139 140
	i := n.nSinks
	n.nSinks++
	n.sinks[i] = out
	return out, i
Łukasz Magiera's avatar
Łukasz Magiera committed
141 142
}

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
143
func (n *node) emit(event interface{}) {
Łukasz Magiera's avatar
Łukasz Magiera committed
144 145 146 147 148 149 150 151 152
	etype := reflect.TypeOf(event)
	if etype != n.typ {
		panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, etype))
	}

	n.lk.RLock()
	for _, ch := range n.sinks {
		ch <- event
	}
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
153
	n.lk.RUnlock()
Łukasz Magiera's avatar
Łukasz Magiera committed
154 155 156 157 158 159 160 161 162 163
}

///////////////////////
// UTILS

func typePath(t reflect.Type) string {
	return t.PkgPath() + "/" + t.String()
}

var _ Bus = &bus{}