data_subscriber.go 1.11 KB
Newer Older
1 2 3 4 5
package notifications

import "sync"

type TopicDataSubscriber struct {
6 7
	idMapLk sync.RWMutex
	data    map[Topic][]TopicData
8 9 10 11 12 13 14
	Subscriber
}

// NewTopicDataSubscriber produces a subscriber that will transform
// events and topics before passing them on to the given subscriber
func NewTopicDataSubscriber(sub Subscriber) *TopicDataSubscriber {
	return &TopicDataSubscriber{
15 16
		Subscriber: sub,
		data:       make(map[Topic][]TopicData),
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
	}
}

func (m *TopicDataSubscriber) AddTopicData(id Topic, data TopicData) {
	m.idMapLk.Lock()
	m.data[id] = append(m.data[id], data)
	m.idMapLk.Unlock()
}

func (m *TopicDataSubscriber) getData(id Topic) []TopicData {
	m.idMapLk.RLock()
	defer m.idMapLk.RUnlock()

	data, ok := m.data[id]
	if !ok {
		return []TopicData{}
	}
	newData := make([]TopicData, len(data))
	for i, d := range data {
		newData[i] = d
	}
	return newData
}

func (m *TopicDataSubscriber) OnNext(topic Topic, ev Event) {
	for _, data := range m.getData(topic) {
		m.Subscriber.OnNext(data, ev)
	}
}

func (m *TopicDataSubscriber) OnClose(topic Topic) {
	for _, data := range m.getData(topic) {
		m.Subscriber.OnClose(data)
	}
}