dial_queue_test.go 4.47 KB
Newer Older
1 2 3 4 5
package dht

import (
	"context"
	"sync"
Raúl Kripalani's avatar
Raúl Kripalani committed
6
	"sync/atomic"
7 8 9
	"testing"
	"time"

Raúl Kripalani's avatar
Raúl Kripalani committed
10 11
	peer "github.com/libp2p/go-libp2p-peer"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
12 13 14
)

func TestDialQueueGrowsOnSlowDials(t *testing.T) {
Raúl Kripalani's avatar
Raúl Kripalani committed
15

16 17 18
	in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
	hang := make(chan struct{})

Raúl Kripalani's avatar
Raúl Kripalani committed
19
	var cnt int32
20
	dialFn := func(ctx context.Context, p peer.ID) error {
Raúl Kripalani's avatar
Raúl Kripalani committed
21
		atomic.AddInt32(&cnt, 1)
22 23 24 25 26 27 28 29 30 31
		<-hang
		return nil
	}

	// Enqueue 20 jobs.
	for i := 0; i < 20; i++ {
		in.EnqChan <- peer.ID(i)
	}

	// remove the mute period to grow faster.
32
	dq := newDialQueue(context.Background(), "test", in, dialFn, 10*time.Minute, 0)
33 34

	for i := 0; i < 4; i++ {
35
		_ = dq.Consume()
36 37 38
		time.Sleep(100 * time.Millisecond)
	}

Raúl Kripalani's avatar
Raúl Kripalani committed
39 40 41 42 43 44
	for i := 0; i < 20; i++ {
		if atomic.LoadInt32(&cnt) > int32(DialQueueMinParallelism) {
			return
		}
		time.Sleep(100 * time.Millisecond)
	}
45

Raúl Kripalani's avatar
Raúl Kripalani committed
46
	t.Errorf("expected 19 concurrent dials, got %d", atomic.LoadInt32(&cnt))
47 48 49 50

}

func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
Raúl Kripalani's avatar
Raúl Kripalani committed
51 52
	// reduce interference from the other shrink path.

53 54 55
	in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
	hang := make(chan struct{})

Raúl Kripalani's avatar
Raúl Kripalani committed
56
	wg := new(sync.WaitGroup)
57 58 59 60 61 62 63
	wg.Add(13)
	dialFn := func(ctx context.Context, p peer.ID) error {
		wg.Done()
		<-hang
		return nil
	}

64
	dq := newDialQueue(context.Background(), "test", in, dialFn, 10*time.Minute, 0)
65 66 67 68

	// acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed
	// and immediately returnable.
	for i := 0; i < 3; i++ {
69
		_ = dq.Consume()
70 71
	}

Raúl Kripalani's avatar
Raúl Kripalani committed
72 73 74 75 76 77
	// Enqueue 13 jobs, one per worker we'll grow to.
	for i := 0; i < 13; i++ {
		in.EnqChan <- peer.ID(i)
	}

	waitForWg(t, wg, 2*time.Second)
78 79 80

	// Release a few dialFn, but not all of them because downscaling happens when workers detect there are no
	// consumers to consume their values. So the other three will be these witnesses.
Raúl Kripalani's avatar
Raúl Kripalani committed
81
	for i := 0; i < 3; i++ {
82 83 84 85
		hang <- struct{}{}
	}

	// allow enough time for signalling and dispatching values to outstanding consumers.
Raúl Kripalani's avatar
Raúl Kripalani committed
86
	time.Sleep(1 * time.Second)
87

Raúl Kripalani's avatar
Raúl Kripalani committed
88 89 90 91
	// unblock the rest.
	for i := 0; i < 10; i++ {
		hang <- struct{}{}
	}
92

Raúl Kripalani's avatar
Raúl Kripalani committed
93
	wg = new(sync.WaitGroup)
94 95 96
	// we should now only have 6 workers, because all the shrink events will have been honoured.
	wg.Add(6)

Raúl Kripalani's avatar
Raúl Kripalani committed
97 98
	// enqueue more jobs.
	for i := 0; i < 6; i++ {
99 100 101 102
		in.EnqChan <- peer.ID(i)
	}

	// let's check we have 6 workers hanging.
Raúl Kripalani's avatar
Raúl Kripalani committed
103
	waitForWg(t, wg, 2*time.Second)
104 105 106
}

// Inactivity = workers are idle because the DHT query is progressing slow and is producing too few peers to dial.
Raúl Kripalani's avatar
Raúl Kripalani committed
107
func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
	in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
	hang := make(chan struct{})

	var wg sync.WaitGroup
	wg.Add(13)
	dialFn := func(ctx context.Context, p peer.ID) error {
		wg.Done()
		<-hang
		return nil
	}

	// Enqueue 13 jobs.
	for i := 0; i < 13; i++ {
		in.EnqChan <- peer.ID(i)
	}

124
	dq := newDialQueue(context.Background(), "test", in, dialFn, time.Second, 0)
125 126 127

	// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
	for i := 0; i < 13; i++ {
128
		ch := dq.Consume()
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
		hang <- struct{}{}
		<-ch
		time.Sleep(100 * time.Millisecond)
	}

	// wait for MaxIdlePeriod.
	time.Sleep(1500 * time.Millisecond)

	// we should now only have 6 workers, because all the shrink events will have been honoured.
	wg.Add(6)

	// enqueue more jobs
	for i := 0; i < 10; i++ {
		in.EnqChan <- peer.ID(i)
	}

	// let's check we have 6 workers hanging.
	waitForWg(t, &wg, 2*time.Second)
}

func TestDialQueueMutePeriodHonored(t *testing.T) {
	in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
	hang := make(chan struct{})
	var wg sync.WaitGroup
	wg.Add(6)
	dialFn := func(ctx context.Context, p peer.ID) error {
		wg.Done()
		<-hang
		return nil
	}

	// Enqueue a bunch of jobs.
	for i := 0; i < 20; i++ {
		in.EnqChan <- peer.ID(i)
	}

165
	dq := newDialQueue(context.Background(), "test", in, dialFn, DialQueueMaxIdle, 2*time.Second)
166 167 168

	// pick up three consumers.
	for i := 0; i < 3; i++ {
169
		_ = dq.Consume()
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
		time.Sleep(100 * time.Millisecond)
	}

	time.Sleep(500 * time.Millisecond)

	// we'll only have 6 workers because the grow signals have been ignored.
	waitForWg(t, &wg, 2*time.Second)
}

func waitForWg(t *testing.T, wg *sync.WaitGroup, wait time.Duration) {
	t.Helper()

	done := make(chan struct{})
	go func() {
		defer close(done)
		wg.Wait()
	}()

	select {
	case <-time.After(wait):
		t.Error("timeout while waiting for WaitGroup")
	case <-done:
	}
}