testnotifications.go 4.27 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
package testutil

import (
	"context"
	"sync"
	"testing"

	"github.com/ipfs/go-graphsync/notifications"
	"github.com/stretchr/testify/require"
)

type TestSubscriber struct {
	expectedTopic  notifications.Topic
	receivedEvents chan DispatchedEvent
	closed         chan notifications.Topic
}

type DispatchedEvent struct {
	Topic notifications.Topic
	Event notifications.Event
}

func NewTestSubscriber(bufferSize int) *TestSubscriber {
	return &TestSubscriber{
		receivedEvents: make(chan DispatchedEvent, bufferSize),
		closed:         make(chan notifications.Topic, bufferSize),
	}
}

func (ts *TestSubscriber) OnNext(topic notifications.Topic, ev notifications.Event) {
	ts.receivedEvents <- DispatchedEvent{topic, ev}
}

func (ts *TestSubscriber) OnClose(topic notifications.Topic) {
	ts.closed <- topic
}

func (ts *TestSubscriber) ExpectEvents(ctx context.Context, t *testing.T, events []DispatchedEvent) {
	for _, expectedEvent := range events {
		var event DispatchedEvent
		AssertReceive(ctx, t, ts.receivedEvents, &event, "should receive another event")
		require.Equal(t, expectedEvent, event)
	}
}

func (ts *TestSubscriber) NoEventsReceived(t *testing.T) {
	AssertChannelEmpty(t, ts.receivedEvents, "should have received no events")
}

func (ts *TestSubscriber) ExpectClosesAnyOrder(ctx context.Context, t *testing.T, topics []notifications.Topic) {
	expectedTopics := make(map[notifications.Topic]struct{})
	receivedTopics := make(map[notifications.Topic]struct{})
	for _, expectedTopic := range topics {
		expectedTopics[expectedTopic] = struct{}{}
		var topic notifications.Topic
		AssertReceive(ctx, t, ts.closed, &topic, "should receive another event")
		receivedTopics[topic] = struct{}{}
	}
	require.Equal(t, expectedTopics, receivedTopics)
}

func (ts *TestSubscriber) ExpectCloses(ctx context.Context, t *testing.T, topics []notifications.Topic) {
	for _, expectedTopic := range topics {
		var topic notifications.Topic
		AssertReceive(ctx, t, ts.closed, &topic, "should receive another event")
		require.Equal(t, expectedTopic, topic)
	}
}

type NotifeeVerifier struct {
	expectedTopic notifications.Topic
	subscriber    *TestSubscriber
}

func (nv *NotifeeVerifier) ExpectEvents(ctx context.Context, t *testing.T, events []notifications.Event) {
	dispatchedEvents := make([]DispatchedEvent, 0, len(events))
	for _, ev := range events {
		dispatchedEvents = append(dispatchedEvents, DispatchedEvent{nv.expectedTopic, ev})
	}
	nv.subscriber.ExpectEvents(ctx, t, dispatchedEvents)
}

func (nv *NotifeeVerifier) ExpectClose(ctx context.Context, t *testing.T) {
	nv.subscriber.ExpectCloses(ctx, t, []notifications.Topic{nv.expectedTopic})
}

func NewTestNotifee(topic notifications.Topic, bufferSize int) (notifications.Notifee, *NotifeeVerifier) {
	subscriber := NewTestSubscriber(bufferSize)
	return notifications.Notifee{
			Topic:      topic,
			Subscriber: notifications.NewMappableSubscriber(subscriber, notifications.IdentityTransform),
		}, &NotifeeVerifier{
			expectedTopic: topic,
			subscriber:    subscriber,
		}
}

type MockPublisher struct {
	notifeesLk sync.Mutex
	notifees   []notifications.Notifee
}

func (mp *MockPublisher) AddNotifees(notifees []notifications.Notifee) {
	mp.notifeesLk.Lock()
	mp.notifees = append(mp.notifees, notifees...)
	mp.notifeesLk.Unlock()
}

func (mp *MockPublisher) PublishMatchingEvents(shouldPublish func(notifications.Topic) bool, events []notifications.Event) {
	mp.notifeesLk.Lock()
	var newNotifees []notifications.Notifee
	for _, notifee := range mp.notifees {
		if shouldPublish(notifee.Topic) {
			for _, ev := range events {
				notifee.Subscriber.OnNext(notifee.Topic, ev)
			}
			notifee.Subscriber.OnClose(notifee.Topic)
		} else {
			newNotifees = append(newNotifees, notifee)
		}
	}
	mp.notifees = newNotifees
	mp.notifeesLk.Unlock()
}

func (mp *MockPublisher) PublishEvents(events []notifications.Event) {
	mp.PublishMatchingEvents(func(notifications.Topic) bool { return true }, events)
}

func (mp *MockPublisher) PublishEventsOnTopics(topics []notifications.Topic, events []notifications.Event) {
	shouldPublish := func(topic notifications.Topic) bool {
		for _, testTopic := range topics {
			if topic == testTopic {
				return true
			}
		}
		return false
	}
	mp.PublishMatchingEvents(shouldPublish, events)
}

func NewMockPublisher() *MockPublisher {
	return &MockPublisher{}
}