pubsub.go 2.42 KB
Newer Older
1
package tests
2 3 4 5 6

import (
	"context"
	"testing"
	"time"
Łukasz Magiera's avatar
Łukasz Magiera committed
7 8 9

	"github.com/ipfs/interface-go-ipfs-core"
	"github.com/ipfs/interface-go-ipfs-core/options"
10 11
)

Łukasz Magiera's avatar
Łukasz Magiera committed
12
func (tp *TestSuite) TestPubSub(t *testing.T) {
13 14 15 16 17 18 19
	tp.hasApi(t, func(api iface.CoreAPI) error {
		if api.PubSub() == nil {
			return apiNotImplemented
		}
		return nil
	})

20
	t.Run("TestBasicPubSub", tp.TestBasicPubSub)
21 22
}

Łukasz Magiera's avatar
Łukasz Magiera committed
23
func (tp *TestSuite) TestBasicPubSub(t *testing.T) {
24 25 26
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

27
	apis, err := tp.MakeAPISwarm(ctx, true, 2)
28 29 30 31 32 33 34 35 36
	if err != nil {
		t.Fatal(err)
	}

	sub, err := apis[0].PubSub().Subscribe(ctx, "testch")
	if err != nil {
		t.Fatal(err)
	}

37
	done := make(chan struct{})
38
	go func() {
39 40
		defer close(done)

Steven Allen's avatar
Steven Allen committed
41 42
		ticker := time.NewTicker(100 * time.Millisecond)
		defer ticker.Stop()
43 44

		for {
Łukasz Magiera's avatar
Łukasz Magiera committed
45
			err := apis[1].PubSub().Publish(ctx, "testch", []byte("hello world"))
46 47 48 49 50
			switch err {
			case nil:
			case context.Canceled:
				return
			default:
51 52 53
				t.Error(err)
				cancel()
				return
54 55
			}
			select {
Steven Allen's avatar
Steven Allen committed
56
			case <-ticker.C:
57 58 59 60 61 62
			case <-ctx.Done():
				return
			}
		}
	}()

63 64 65 66 67 68 69
	// Wait for the sender to finish before we return.
	// Otherwise, we can get random errors as publish fails.
	defer func() {
		cancel()
		<-done
	}()

70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
	m, err := sub.Next(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if string(m.Data()) != "hello world" {
		t.Errorf("got invalid data: %s", string(m.Data()))
	}

	self1, err := apis[1].Key().Self(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if m.From() != self1.ID() {
		t.Errorf("m.From didn't match")
	}

	peers, err := apis[1].PubSub().Peers(ctx, options.PubSub.Topic("testch"))
	if err != nil {
		t.Fatal(err)
	}

	if len(peers) != 1 {
		t.Fatalf("got incorrect number of peers: %d", len(peers))
	}

	self0, err := apis[0].Key().Self(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if peers[0] != self0.ID() {
		t.Errorf("peer didn't match")
	}

	peers, err = apis[1].PubSub().Peers(ctx, options.PubSub.Topic("nottestch"))
	if err != nil {
		t.Fatal(err)
	}

	if len(peers) != 0 {
		t.Fatalf("got incorrect number of peers: %d", len(peers))
	}

	topics, err := apis[0].PubSub().Ls(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if len(topics) != 1 {
		t.Fatalf("got incorrect number of topics: %d", len(peers))
	}

	if topics[0] != "testch" {
		t.Errorf("topic didn't match")
	}

	topics, err = apis[1].PubSub().Ls(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if len(topics) != 0 {
		t.Fatalf("got incorrect number of topics: %d", len(peers))
	}
}