notifications_test.go 3.95 KB
Newer Older
1 2 3 4
package notifications

import (
	"bytes"
5
	"context"
6 7 8
	"testing"
	"time"

9 10 11
	blocks "gitlab.dms3.io/dms3/go-block-format"
	cid "gitlab.dms3.io/dms3/go-cid"
	blocksutil "gitlab.dms3.io/dms3/go-dms3-blocksutil"
12 13
)

14 15 16 17 18 19
func TestDuplicates(t *testing.T) {
	b1 := blocks.NewBlock([]byte("1"))
	b2 := blocks.NewBlock([]byte("2"))

	n := New()
	defer n.Shutdown()
20
	ch := n.Subscribe(context.Background(), b1.Cid(), b2.Cid())
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38

	n.Publish(b1)
	blockRecvd, ok := <-ch
	if !ok {
		t.Fail()
	}
	assertBlocksEqual(t, b1, blockRecvd)

	n.Publish(b1) // ignored duplicate

	n.Publish(b2)
	blockRecvd, ok = <-ch
	if !ok {
		t.Fail()
	}
	assertBlocksEqual(t, b2, blockRecvd)
}

39
func TestPublishSubscribe(t *testing.T) {
40
	blockSent := blocks.NewBlock([]byte("Greetings from The Interval"))
41 42 43

	n := New()
	defer n.Shutdown()
44
	ch := n.Subscribe(context.Background(), blockSent.Cid())
45

Jeromy's avatar
Jeromy committed
46
	n.Publish(blockSent)
47 48 49 50 51
	blockRecvd, ok := <-ch
	if !ok {
		t.Fail()
	}

Jeromy's avatar
Jeromy committed
52
	assertBlocksEqual(t, blockRecvd, blockSent)
53 54 55

}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
56
func TestSubscribeMany(t *testing.T) {
57 58
	e1 := blocks.NewBlock([]byte("1"))
	e2 := blocks.NewBlock([]byte("2"))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
59 60 61

	n := New()
	defer n.Shutdown()
62
	ch := n.Subscribe(context.Background(), e1.Cid(), e2.Cid())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78

	n.Publish(e1)
	r1, ok := <-ch
	if !ok {
		t.Fatal("didn't receive first expected block")
	}
	assertBlocksEqual(t, e1, r1)

	n.Publish(e2)
	r2, ok := <-ch
	if !ok {
		t.Fatal("didn't receive second expected block")
	}
	assertBlocksEqual(t, e2, r2)
}

79 80 81 82 83 84 85
// TestDuplicateSubscribe tests a scenario where a given block
// would be requested twice at the same time.
func TestDuplicateSubscribe(t *testing.T) {
	e1 := blocks.NewBlock([]byte("1"))

	n := New()
	defer n.Shutdown()
86 87
	ch1 := n.Subscribe(context.Background(), e1.Cid())
	ch2 := n.Subscribe(context.Background(), e1.Cid())
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102

	n.Publish(e1)
	r1, ok := <-ch1
	if !ok {
		t.Fatal("didn't receive first expected block")
	}
	assertBlocksEqual(t, e1, r1)

	r2, ok := <-ch2
	if !ok {
		t.Fatal("didn't receive second expected block")
	}
	assertBlocksEqual(t, e1, r2)
}

103 104 105 106 107 108 109 110 111 112 113 114 115 116
func TestShutdownBeforeUnsubscribe(t *testing.T) {
	e1 := blocks.NewBlock([]byte("1"))

	n := New()
	ctx, cancel := context.WithCancel(context.Background())
	ch := n.Subscribe(ctx, e1.Cid()) // no keys provided
	n.Shutdown()
	cancel()

	select {
	case _, ok := <-ch:
		if ok {
			t.Fatal("channel should have been closed")
		}
Steven Allen's avatar
Steven Allen committed
117
	case <-time.After(5 * time.Second):
118 119 120 121
		t.Fatal("channel should have been closed")
	}
}

122 123 124
func TestSubscribeIsANoopWhenCalledWithNoKeys(t *testing.T) {
	n := New()
	defer n.Shutdown()
125
	ch := n.Subscribe(context.Background()) // no keys provided
126 127 128 129 130
	if _, ok := <-ch; ok {
		t.Fatal("should be closed if no keys provided")
	}
}

131 132 133
func TestCarryOnWhenDeadlineExpires(t *testing.T) {

	impossibleDeadline := time.Nanosecond
rht's avatar
rht committed
134 135
	fastExpiringCtx, cancel := context.WithTimeout(context.Background(), impossibleDeadline)
	defer cancel()
136 137 138

	n := New()
	defer n.Shutdown()
139
	block := blocks.NewBlock([]byte("A Missed Connection"))
140
	blockChannel := n.Subscribe(fastExpiringCtx, block.Cid())
141 142 143 144

	assertBlockChannelNil(t, blockChannel)
}

145 146 147 148 149 150 151 152 153
func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {

	g := blocksutil.NewBlockGenerator()
	ctx, cancel := context.WithCancel(context.Background())
	n := New()
	defer n.Shutdown()

	t.Log("generate a large number of blocks. exceed default buffer")
	bs := g.Blocks(1000)
154 155
	ks := func() []cid.Cid {
		var keys []cid.Cid
156
		for _, b := range bs {
157
			keys = append(keys, b.Cid())
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
		}
		return keys
	}()

	_ = n.Subscribe(ctx, ks...) // ignore received channel

	t.Log("cancel context before any blocks published")
	cancel()
	for _, b := range bs {
		n.Publish(b)
	}

	t.Log("publishing the large number of blocks to the ignored channel must not deadlock")
}

173
func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) {
174 175 176 177 178 179
	_, ok := <-blockChannel
	if ok {
		t.Fail()
	}
}

180
func assertBlocksEqual(t *testing.T, a, b blocks.Block) {
Jeromy's avatar
Jeromy committed
181
	if !bytes.Equal(a.RawData(), b.RawData()) {
182
		t.Fatal("blocks aren't equal")
183
	}
184
	if a.Cid() != b.Cid() {
185
		t.Fatal("block keys aren't equal")
186 187
	}
}