pubsub.go 2.16 KB
Newer Older
1
package tests
2 3 4 5 6

import (
	"context"
	"testing"
	"time"
Łukasz Magiera's avatar
Łukasz Magiera committed
7 8 9

	"github.com/ipfs/interface-go-ipfs-core"
	"github.com/ipfs/interface-go-ipfs-core/options"
10 11
)

Łukasz Magiera's avatar
Łukasz Magiera committed
12
func (tp *TestSuite) TestPubSub(t *testing.T) {
13 14 15 16 17 18 19
	tp.hasApi(t, func(api iface.CoreAPI) error {
		if api.PubSub() == nil {
			return apiNotImplemented
		}
		return nil
	})

20
	t.Run("TestBasicPubSub", tp.TestBasicPubSub)
21 22
}

Łukasz Magiera's avatar
Łukasz Magiera committed
23
func (tp *TestSuite) TestBasicPubSub(t *testing.T) {
24 25 26
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

27
	apis, err := tp.MakeAPISwarm(ctx, true, 2)
28 29 30 31 32 33 34 35 36 37
	if err != nil {
		t.Fatal(err)
	}

	sub, err := apis[0].PubSub().Subscribe(ctx, "testch")
	if err != nil {
		t.Fatal(err)
	}

	go func() {
Steven Allen's avatar
Steven Allen committed
38 39
		ticker := time.NewTicker(100 * time.Millisecond)
		defer ticker.Stop()
40 41

		for {
Łukasz Magiera's avatar
Łukasz Magiera committed
42
			err := apis[1].PubSub().Publish(ctx, "testch", []byte("hello world"))
43
			if err != nil {
44 45 46
				t.Error(err)
				cancel()
				return
47 48
			}
			select {
Steven Allen's avatar
Steven Allen committed
49
			case <-ticker.C:
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
			case <-ctx.Done():
				return
			}
		}
	}()

	m, err := sub.Next(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if string(m.Data()) != "hello world" {
		t.Errorf("got invalid data: %s", string(m.Data()))
	}

	self1, err := apis[1].Key().Self(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if m.From() != self1.ID() {
		t.Errorf("m.From didn't match")
	}

	peers, err := apis[1].PubSub().Peers(ctx, options.PubSub.Topic("testch"))
	if err != nil {
		t.Fatal(err)
	}

	if len(peers) != 1 {
		t.Fatalf("got incorrect number of peers: %d", len(peers))
	}

	self0, err := apis[0].Key().Self(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if peers[0] != self0.ID() {
		t.Errorf("peer didn't match")
	}

	peers, err = apis[1].PubSub().Peers(ctx, options.PubSub.Topic("nottestch"))
	if err != nil {
		t.Fatal(err)
	}

	if len(peers) != 0 {
		t.Fatalf("got incorrect number of peers: %d", len(peers))
	}

	topics, err := apis[0].PubSub().Ls(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if len(topics) != 1 {
		t.Fatalf("got incorrect number of topics: %d", len(peers))
	}

	if topics[0] != "testch" {
		t.Errorf("topic didn't match")
	}

	topics, err = apis[1].PubSub().Ls(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if len(topics) != 0 {
		t.Fatalf("got incorrect number of topics: %d", len(peers))
	}
}