pubsub.go 2.1 KB
Newer Older
1
package tests
2 3 4 5 6

import (
	"context"
	"testing"
	"time"
Łukasz Magiera's avatar
Łukasz Magiera committed
7 8 9

	"github.com/ipfs/interface-go-ipfs-core"
	"github.com/ipfs/interface-go-ipfs-core/options"
10 11
)

12
func (tp *provider) TestPubSub(t *testing.T) {
13 14 15 16 17 18 19
	tp.hasApi(t, func(api iface.CoreAPI) error {
		if api.PubSub() == nil {
			return apiNotImplemented
		}
		return nil
	})

20
	t.Run("TestBasicPubSub", tp.TestBasicPubSub)
21 22
}

23
func (tp *provider) TestBasicPubSub(t *testing.T) {
24 25 26
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

27
	apis, err := tp.MakeAPISwarm(ctx, true, 2)
28 29 30 31 32 33 34 35 36 37 38 39 40
	if err != nil {
		t.Fatal(err)
	}

	sub, err := apis[0].PubSub().Subscribe(ctx, "testch")
	if err != nil {
		t.Fatal(err)
	}

	go func() {
		tick := time.Tick(100 * time.Millisecond)

		for {
Łukasz Magiera's avatar
Łukasz Magiera committed
41
			err := apis[1].PubSub().Publish(ctx, "testch", []byte("hello world"))
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
			if err != nil {
				t.Fatal(err)
			}
			select {
			case <-tick:
			case <-ctx.Done():
				return
			}
		}
	}()

	m, err := sub.Next(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if string(m.Data()) != "hello world" {
		t.Errorf("got invalid data: %s", string(m.Data()))
	}

	self1, err := apis[1].Key().Self(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if m.From() != self1.ID() {
		t.Errorf("m.From didn't match")
	}

	peers, err := apis[1].PubSub().Peers(ctx, options.PubSub.Topic("testch"))
	if err != nil {
		t.Fatal(err)
	}

	if len(peers) != 1 {
		t.Fatalf("got incorrect number of peers: %d", len(peers))
	}

	self0, err := apis[0].Key().Self(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if peers[0] != self0.ID() {
		t.Errorf("peer didn't match")
	}

	peers, err = apis[1].PubSub().Peers(ctx, options.PubSub.Topic("nottestch"))
	if err != nil {
		t.Fatal(err)
	}

	if len(peers) != 0 {
		t.Fatalf("got incorrect number of peers: %d", len(peers))
	}

	topics, err := apis[0].PubSub().Ls(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if len(topics) != 1 {
		t.Fatalf("got incorrect number of topics: %d", len(peers))
	}

	if topics[0] != "testch" {
		t.Errorf("topic didn't match")
	}

	topics, err = apis[1].PubSub().Ls(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if len(topics) != 0 {
		t.Fatalf("got incorrect number of topics: %d", len(peers))
	}
}