basic.go 2.15 KB
Newer Older
Łukasz Magiera's avatar
Łukasz Magiera committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
package event

import (
	"errors"
	"fmt"
	"io"
	"reflect"
	"sync"
)

///////////////////////
// BUS

type bus struct {
	lk sync.Mutex
	nodes map[string]*node
}

func NewBus() Bus {
	return &bus{
		nodes: map[string]*node{},
	}
}

func (b *bus) withNode(evtType interface{}, cb func(*node)) error {
	typ := reflect.TypeOf(evtType)
	if typ.Kind() != reflect.Ptr {
		return errors.New("subscribe called with non-pointer type")
	}
	typ = typ.Elem()
	path := typePath(typ)

	b.lk.Lock()

	n, ok := b.nodes[path]
	if !ok {
		n = newNode(typ)
		b.nodes[path] = n
	}

	n.lk.Lock()
	b.lk.Unlock()
	cb(n)
	n.lk.Unlock()
	return nil
}

func (b *bus) Sub(evtType interface{}) (s Subscription, err error) {
	err = b.withNode(evtType, func(n *node) {
		s = n.sub(0)
	})
	return
}

func (b *bus) Emitter(evtType interface{}) (e Emitter, err error) {
	err = b.withNode(evtType, func(n *node) {
		e = &emitter{
			Closer: closer(func(){}), //TODO: actually do something here
			node: n,
		}
	})
	return
}

///////////////////////
// NODE

type node struct {
	// Note: make sure to NEVER lock bus.lk when this lock is held
	lk sync.RWMutex

	typ reflect.Type

	n     int
	sinks map[int]chan interface{}
}

func newNode(typ reflect.Type) *node {
	return &node{
		typ: typ,

		sinks: map[int]chan interface{}{},
	}
}

func (n *node) sub(buf int) Subscription {
	out := make(chan interface{}, buf)
	n.n++
	n.sinks[n.n] = out
	return &sub{
		out: out,
	}
}

func (n *node) Emit(event interface{}) {
	etype := reflect.TypeOf(event)
	if etype != n.typ {
		panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, etype))
	}

	n.lk.RLock()
	for _, ch := range n.sinks {
		ch <- event
	}
}

///////////////////////
// SUB

type sub struct {
	io.Closer
	out <-chan interface{}
}

func (s *sub) Events() <-chan interface{} {
	return s.out
}

///////////////////////
// EMITTERS

type emitter struct {
	io.Closer
	*node
}

///////////////////////
// UTILS

func typePath(t reflect.Type) string {
	return t.PkgPath() + "/" + t.String()
}

type closer func()

func (c closer) Close() error {
	c()
	return nil
}

var _ Bus = &bus{}
var _ Subscription = &sub{}
var _ Emitter = &emitter{}
var _ io.Closer = closer(nil)