testnotifications.go 4.28 KB
Newer Older
1 2 3 4 5 6 7 8
package testutil

import (
	"context"
	"sync"
	"testing"

	"github.com/stretchr/testify/require"
9 10

	"github.com/ipfs/go-graphsync/notifications"
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
)

type TestSubscriber struct {
	expectedTopic  notifications.Topic
	receivedEvents chan DispatchedEvent
	closed         chan notifications.Topic
}

type DispatchedEvent struct {
	Topic notifications.Topic
	Event notifications.Event
}

func NewTestSubscriber(bufferSize int) *TestSubscriber {
	return &TestSubscriber{
		receivedEvents: make(chan DispatchedEvent, bufferSize),
		closed:         make(chan notifications.Topic, bufferSize),
	}
}

func (ts *TestSubscriber) OnNext(topic notifications.Topic, ev notifications.Event) {
	ts.receivedEvents <- DispatchedEvent{topic, ev}
}

func (ts *TestSubscriber) OnClose(topic notifications.Topic) {
	ts.closed <- topic
}

func (ts *TestSubscriber) ExpectEvents(ctx context.Context, t *testing.T, events []DispatchedEvent) {
	for _, expectedEvent := range events {
		var event DispatchedEvent
		AssertReceive(ctx, t, ts.receivedEvents, &event, "should receive another event")
		require.Equal(t, expectedEvent, event)
	}
}

func (ts *TestSubscriber) NoEventsReceived(t *testing.T) {
	AssertChannelEmpty(t, ts.receivedEvents, "should have received no events")
}

func (ts *TestSubscriber) ExpectClosesAnyOrder(ctx context.Context, t *testing.T, topics []notifications.Topic) {
	expectedTopics := make(map[notifications.Topic]struct{})
	receivedTopics := make(map[notifications.Topic]struct{})
	for _, expectedTopic := range topics {
		expectedTopics[expectedTopic] = struct{}{}
		var topic notifications.Topic
		AssertReceive(ctx, t, ts.closed, &topic, "should receive another event")
		receivedTopics[topic] = struct{}{}
	}
	require.Equal(t, expectedTopics, receivedTopics)
}

func (ts *TestSubscriber) ExpectCloses(ctx context.Context, t *testing.T, topics []notifications.Topic) {
	for _, expectedTopic := range topics {
		var topic notifications.Topic
		AssertReceive(ctx, t, ts.closed, &topic, "should receive another event")
		require.Equal(t, expectedTopic, topic)
	}
}

type NotifeeVerifier struct {
	expectedTopic notifications.Topic
	subscriber    *TestSubscriber
}

func (nv *NotifeeVerifier) ExpectEvents(ctx context.Context, t *testing.T, events []notifications.Event) {
	dispatchedEvents := make([]DispatchedEvent, 0, len(events))
	for _, ev := range events {
		dispatchedEvents = append(dispatchedEvents, DispatchedEvent{nv.expectedTopic, ev})
	}
	nv.subscriber.ExpectEvents(ctx, t, dispatchedEvents)
}

func (nv *NotifeeVerifier) ExpectClose(ctx context.Context, t *testing.T) {
	nv.subscriber.ExpectCloses(ctx, t, []notifications.Topic{nv.expectedTopic})
}

88
func NewTestNotifee(data notifications.TopicData, bufferSize int) (notifications.Notifee, *NotifeeVerifier) {
89 90
	subscriber := NewTestSubscriber(bufferSize)
	return notifications.Notifee{
91
			Data:       data,
92
			Subscriber: notifications.NewTopicDataSubscriber(subscriber),
93
		}, &NotifeeVerifier{
94
			expectedTopic: data,
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
			subscriber:    subscriber,
		}
}

type MockPublisher struct {
	notifeesLk sync.Mutex
	notifees   []notifications.Notifee
}

func (mp *MockPublisher) AddNotifees(notifees []notifications.Notifee) {
	mp.notifeesLk.Lock()
	mp.notifees = append(mp.notifees, notifees...)
	mp.notifeesLk.Unlock()
}

110
func (mp *MockPublisher) PublishMatchingEvents(shouldPublish func(notifications.TopicData) bool, events []notifications.Event) {
111 112 113
	mp.notifeesLk.Lock()
	var newNotifees []notifications.Notifee
	for _, notifee := range mp.notifees {
114
		if shouldPublish(notifee.Data) {
115
			for _, ev := range events {
116
				notifee.Subscriber.Subscriber.OnNext(notifee.Data, ev)
117
			}
118
			notifee.Subscriber.Subscriber.OnClose(notifee.Data)
119 120 121 122 123 124 125 126 127
		} else {
			newNotifees = append(newNotifees, notifee)
		}
	}
	mp.notifees = newNotifees
	mp.notifeesLk.Unlock()
}

func (mp *MockPublisher) PublishEvents(events []notifications.Event) {
128
	mp.PublishMatchingEvents(func(notifications.TopicData) bool { return true }, events)
129 130
}

131 132 133 134
func (mp *MockPublisher) PublishEventsOnTopicData(data []notifications.TopicData, events []notifications.Event) {
	shouldPublish := func(topic notifications.TopicData) bool {
		for _, testTopicData := range data {
			if topic == testTopicData {
135 136 137 138 139 140 141 142 143 144 145
				return true
			}
		}
		return false
	}
	mp.PublishMatchingEvents(shouldPublish, events)
}

func NewMockPublisher() *MockPublisher {
	return &MockPublisher{}
}