basic.go 2.61 KB
Newer Older
Łukasz Magiera's avatar
Łukasz Magiera committed
1 2 3 4 5 6 7
package event

import (
	"errors"
	"fmt"
	"reflect"
	"sync"
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
8
	"sync/atomic"
Łukasz Magiera's avatar
Łukasz Magiera committed
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
)

///////////////////////
// BUS

type bus struct {
	lk sync.Mutex
	nodes map[string]*node
}

func NewBus() Bus {
	return &bus{
		nodes: map[string]*node{},
	}
}

func (b *bus) withNode(evtType interface{}, cb func(*node)) error {
	typ := reflect.TypeOf(evtType)
	if typ.Kind() != reflect.Ptr {
		return errors.New("subscribe called with non-pointer type")
	}
	typ = typ.Elem()
	path := typePath(typ)

	b.lk.Lock()

	n, ok := b.nodes[path]
	if !ok {
		n = newNode(typ)
		b.nodes[path] = n
	}

	n.lk.Lock()
	b.lk.Unlock()
	cb(n)
	n.lk.Unlock()
	return nil
}

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
func (b *bus) tryDropNode(evtType interface{}) {
	path := typePath(reflect.TypeOf(evtType).Elem())

	b.lk.Lock()
	n, ok := b.nodes[path]
	if !ok { // already dropped
		b.lk.Unlock()
		return
	}

	n.lk.Lock()
	if n.nEmitters > 0 || len(n.sinks) > 0 {
		n.lk.Unlock()
		b.lk.Unlock()
		return // still in use
	}
	n.lk.Unlock()

	delete(b.nodes, path)
	b.lk.Unlock()
}

func (b *bus) Subscribe(evtType interface{}) (s <-chan interface{}, c CancelFunc, err error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
71
	err = b.withNode(evtType, func(n *node) {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
72 73 74 75 76 77 78 79 80 81 82 83
		out, i := n.sub(0)
		s = out
		c = func() {
			n.lk.Lock()
			delete(n.sinks, i)
			close(out)
			tryDrop := len(n.sinks) == 0 && n.nEmitters == 0
			n.lk.Unlock()
			if tryDrop {
				b.tryDropNode(evtType)
			}
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
84 85 86 87
	})
	return
}

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
88
func (b *bus) Emitter(evtType interface{}) (e EmitFunc, c CancelFunc, err error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
89
	err = b.withNode(evtType, func(n *node) {
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
		atomic.AddInt32(&n.nEmitters, 1)
		closed := false

		e = func(event interface{}) {
			if closed {
				panic("emitter is closed")
			}
			n.emit(event)
		}

		c = func() {
			closed = true
			if atomic.AddInt32(&n.nEmitters, -1) == 0 {
				b.tryDropNode(evtType)
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
105 106 107 108 109 110 111 112 113 114 115 116 117 118
		}
	})
	return
}

///////////////////////
// NODE

type node struct {
	// Note: make sure to NEVER lock bus.lk when this lock is held
	lk sync.RWMutex

	typ reflect.Type

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
119 120 121
	nEmitters int32
	nSinks int
	sinks  map[int]chan interface{}
Łukasz Magiera's avatar
Łukasz Magiera committed
122 123 124 125 126 127 128 129 130 131
}

func newNode(typ reflect.Type) *node {
	return &node{
		typ: typ,

		sinks: map[int]chan interface{}{},
	}
}

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
132
func (n *node) sub(buf int) (chan interface{}, int) {
Łukasz Magiera's avatar
Łukasz Magiera committed
133
	out := make(chan interface{}, buf)
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
134 135 136 137
	i := n.nSinks
	n.nSinks++
	n.sinks[i] = out
	return out, i
Łukasz Magiera's avatar
Łukasz Magiera committed
138 139
}

Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
140
func (n *node) emit(event interface{}) {
Łukasz Magiera's avatar
Łukasz Magiera committed
141 142 143 144 145 146 147 148 149
	etype := reflect.TypeOf(event)
	if etype != n.typ {
		panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, etype))
	}

	n.lk.RLock()
	for _, ch := range n.sinks {
		ch <- event
	}
Łukasz Magiera's avatar
MVP  
Łukasz Magiera committed
150
	n.lk.RUnlock()
Łukasz Magiera's avatar
Łukasz Magiera committed
151 152 153 154 155 156 157 158 159 160
}

///////////////////////
// UTILS

func typePath(t reflect.Type) string {
	return t.PkgPath() + "/" + t.String()
}

var _ Bus = &bus{}