pubsub.go 2.12 KB
Newer Older
1
package tests
2 3 4

import (
	"context"
5
	"github.com/ipfs/go-ipfs/core/coreapi/interface"
6 7 8 9 10
	"github.com/ipfs/go-ipfs/core/coreapi/interface/options"
	"testing"
	"time"
)

11
func (tp *provider) TestPubSub(t *testing.T) {
12 13 14 15 16 17 18
	tp.hasApi(t, func(api iface.CoreAPI) error {
		if api.PubSub() == nil {
			return apiNotImplemented
		}
		return nil
	})

19
	t.Run("TestBasicPubSub", tp.TestBasicPubSub)
20 21
}

22
func (tp *provider) TestBasicPubSub(t *testing.T) {
23 24 25
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

26
	apis, err := tp.MakeAPISwarm(ctx, true, 2)
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
	if err != nil {
		t.Fatal(err)
	}

	sub, err := apis[0].PubSub().Subscribe(ctx, "testch")
	if err != nil {
		t.Fatal(err)
	}

	go func() {
		tick := time.Tick(100 * time.Millisecond)

		for {
			err = apis[1].PubSub().Publish(ctx, "testch", []byte("hello world"))
			if err != nil {
				t.Fatal(err)
			}
			select {
			case <-tick:
			case <-ctx.Done():
				return
			}
		}
	}()

	m, err := sub.Next(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if string(m.Data()) != "hello world" {
		t.Errorf("got invalid data: %s", string(m.Data()))
	}

	self1, err := apis[1].Key().Self(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if m.From() != self1.ID() {
		t.Errorf("m.From didn't match")
	}

	peers, err := apis[1].PubSub().Peers(ctx, options.PubSub.Topic("testch"))
	if err != nil {
		t.Fatal(err)
	}

	if len(peers) != 1 {
		t.Fatalf("got incorrect number of peers: %d", len(peers))
	}

	self0, err := apis[0].Key().Self(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if peers[0] != self0.ID() {
		t.Errorf("peer didn't match")
	}

	peers, err = apis[1].PubSub().Peers(ctx, options.PubSub.Topic("nottestch"))
	if err != nil {
		t.Fatal(err)
	}

	if len(peers) != 0 {
		t.Fatalf("got incorrect number of peers: %d", len(peers))
	}

	topics, err := apis[0].PubSub().Ls(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if len(topics) != 1 {
		t.Fatalf("got incorrect number of topics: %d", len(peers))
	}

	if topics[0] != "testch" {
		t.Errorf("topic didn't match")
	}

	topics, err = apis[1].PubSub().Ls(ctx)
	if err != nil {
		t.Fatal(err)
	}

	if len(topics) != 0 {
		t.Fatalf("got incorrect number of topics: %d", len(peers))
	}
}