sessionpeermanager_test.go 6.99 KB
Newer Older
1 2 3 4
package sessionpeermanager

import (
	"context"
hannahhoward's avatar
hannahhoward committed
5
	"math/rand"
6
	"sync"
7 8 9 10 11 12
	"testing"
	"time"

	"github.com/ipfs/go-bitswap/testutil"

	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
13
	peer "github.com/libp2p/go-libp2p-core/peer"
14 15
)

16 17 18
type fakePeerProviderFinder struct {
	peers     []peer.ID
	completed chan struct{}
19 20
}

21
func (fppf *fakePeerProviderFinder) FindProvidersAsync(ctx context.Context, c cid.Cid) <-chan peer.ID {
22 23
	peerCh := make(chan peer.ID)
	go func() {
24 25

		for _, p := range fppf.peers {
26 27 28
			select {
			case peerCh <- p:
			case <-ctx.Done():
29
				close(peerCh)
30 31 32
				return
			}
		}
33 34 35
		close(peerCh)

		select {
36
		case fppf.completed <- struct{}{}:
37 38
		case <-ctx.Done():
		}
39 40 41 42
	}()
	return peerCh
}

43
type fakePeerTagger struct {
44
	lk          sync.Mutex
45
	taggedPeers []peer.ID
46
	wait        sync.WaitGroup
47 48
}

49 50
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, n int) {
	fpt.wait.Add(1)
51 52 53

	fpt.lk.Lock()
	defer fpt.lk.Unlock()
54
	fpt.taggedPeers = append(fpt.taggedPeers, p)
55
}
56

57 58 59
func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
	defer fpt.wait.Done()

60 61
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
62 63 64 65
	for i := 0; i < len(fpt.taggedPeers); i++ {
		if fpt.taggedPeers[i] == p {
			fpt.taggedPeers[i] = fpt.taggedPeers[len(fpt.taggedPeers)-1]
			fpt.taggedPeers = fpt.taggedPeers[:len(fpt.taggedPeers)-1]
66 67 68 69
			return
		}
	}
}
70

71 72 73 74 75 76
func (fpt *fakePeerTagger) count() int {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
	return len(fpt.taggedPeers)
}

77 78 79 80 81 82 83 84 85
func getPeers(sessionPeerManager *SessionPeerManager) []peer.ID {
	optimizedPeers := sessionPeerManager.GetOptimizedPeers()
	var peers []peer.ID
	for _, optimizedPeer := range optimizedPeers {
		peers = append(peers, optimizedPeer.Peer)
	}
	return peers
}

86 87 88 89
func TestFindingMorePeers(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
90 91
	completed := make(chan struct{})

92
	peers := testutil.GeneratePeers(5)
93 94
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
95 96 97
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

98
	sessionPeerManager := New(ctx, id, fpt, fppf)
99 100 101 102

	findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer findCancel()
	sessionPeerManager.FindMorePeers(ctx, c)
103 104 105 106 107 108 109
	select {
	case <-completed:
	case <-findCtx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

110
	sessionPeers := getPeers(sessionPeerManager)
111 112 113 114 115 116 117 118
	if len(sessionPeers) != len(peers) {
		t.Fatal("incorrect number of peers found")
	}
	for _, p := range sessionPeers {
		if !testutil.ContainsPeer(peers, p) {
			t.Fatal("incorrect peer found through finding providers")
		}
	}
119
	if len(fpt.taggedPeers) != len(peers) {
120 121 122 123 124 125 126 127 128
		t.Fatal("Peers were not tagged!")
	}
}

func TestRecordingReceivedBlocks(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
129 130
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{}
131 132 133
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

134
	sessionPeerManager := New(ctx, id, fpt, fppf)
135 136
	sessionPeerManager.RecordPeerResponse(p, c)
	time.Sleep(10 * time.Millisecond)
137
	sessionPeers := getPeers(sessionPeerManager)
138 139 140 141 142 143
	if len(sessionPeers) != 1 {
		t.Fatal("did not add peer on receive")
	}
	if sessionPeers[0] != p {
		t.Fatal("incorrect peer added on receive")
	}
144
	if len(fpt.taggedPeers) != 1 {
145 146 147 148
		t.Fatal("Peers was not tagged!")
	}
}

hannahhoward's avatar
hannahhoward committed
149 150
func TestOrderingPeers(t *testing.T) {
	ctx := context.Background()
151
	ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
hannahhoward's avatar
hannahhoward committed
152 153
	defer cancel()
	peers := testutil.GeneratePeers(100)
154
	completed := make(chan struct{})
155 156
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
hannahhoward's avatar
hannahhoward committed
157 158
	c := testutil.GenerateCids(1)
	id := testutil.GenerateSessionID()
159
	sessionPeerManager := New(ctx, id, fpt, fppf)
hannahhoward's avatar
hannahhoward committed
160 161 162

	// add all peers to session
	sessionPeerManager.FindMorePeers(ctx, c[0])
163 164 165 166 167 168
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)
hannahhoward's avatar
hannahhoward committed
169 170 171 172 173 174 175 176 177 178

	// record broadcast
	sessionPeerManager.RecordPeerRequests(nil, c)

	// record receives
	peer1 := peers[rand.Intn(100)]
	peer2 := peers[rand.Intn(100)]
	peer3 := peers[rand.Intn(100)]
	time.Sleep(1 * time.Millisecond)
	sessionPeerManager.RecordPeerResponse(peer1, c[0])
179
	time.Sleep(5 * time.Millisecond)
hannahhoward's avatar
hannahhoward committed
180 181 182 183 184 185 186 187 188
	sessionPeerManager.RecordPeerResponse(peer2, c[0])
	time.Sleep(1 * time.Millisecond)
	sessionPeerManager.RecordPeerResponse(peer3, c[0])

	sessionPeers := sessionPeerManager.GetOptimizedPeers()
	if len(sessionPeers) != maxOptimizedPeers {
		t.Fatal("Should not return more than the max of optimized peers")
	}

189
	// should prioritize peers which are fastest
190
	if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
hannahhoward's avatar
hannahhoward committed
191 192 193
		t.Fatal("Did not prioritize peers that received blocks")
	}

194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
	// should give first peer rating of 1
	if sessionPeers[0].OptimizationRating < 1.0 {
		t.Fatal("Did not assign rating to best peer correctly")
	}

	// should give other optimized peers ratings between 0 & 1
	if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) ||
		(sessionPeers[2].OptimizationRating >= 1.0) || (sessionPeers[2].OptimizationRating <= 0.0) {
		t.Fatal("Did not assign rating to other optimized peers correctly")
	}

	// should other peers rating of zero
	for i := 3; i < maxOptimizedPeers; i++ {
		if sessionPeers[i].OptimizationRating != 0.0 {
			t.Fatal("Did not assign rating to unoptimized peer correctly")
		}
	}

212 213 214 215 216 217 218
	c2 := testutil.GenerateCids(1)

	// Request again
	sessionPeerManager.RecordPeerRequests(nil, c2)

	// Receive a second time
	sessionPeerManager.RecordPeerResponse(peer3, c2[0])
hannahhoward's avatar
hannahhoward committed
219 220 221 222 223 224 225

	// call again
	nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
	if len(nextSessionPeers) != maxOptimizedPeers {
		t.Fatal("Should not return more than the max of optimized peers")
	}

226
	// should sort by average latency
227 228
	if (nextSessionPeers[0].Peer != peer1) || (nextSessionPeers[1].Peer != peer3) ||
		(nextSessionPeers[2].Peer != peer2) {
229
		t.Fatal("Did not dedup peers which received multiple blocks")
hannahhoward's avatar
hannahhoward committed
230 231 232 233 234
	}

	// should randomize other peers
	totalSame := 0
	for i := 3; i < maxOptimizedPeers; i++ {
235
		if sessionPeers[i].Peer == nextSessionPeers[i].Peer {
hannahhoward's avatar
hannahhoward committed
236 237 238 239 240 241 242
			totalSame++
		}
	}
	if totalSame >= maxOptimizedPeers-3 {
		t.Fatal("should not return the same random peers each time")
	}
}
243

244 245 246 247 248
func TestUntaggingPeers(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer cancel()
	peers := testutil.GeneratePeers(5)
249
	completed := make(chan struct{})
250 251
	fpt := &fakePeerTagger{}
	fppf := &fakePeerProviderFinder{peers, completed}
252 253 254
	c := testutil.GenerateCids(1)[0]
	id := testutil.GenerateSessionID()

255
	sessionPeerManager := New(ctx, id, fpt, fppf)
256 257

	sessionPeerManager.FindMorePeers(ctx, c)
258 259 260 261 262 263 264
	select {
	case <-completed:
	case <-ctx.Done():
		t.Fatal("Did not finish finding providers")
	}
	time.Sleep(2 * time.Millisecond)

265
	if fpt.count() != len(peers) {
266 267 268
		t.Fatal("Peers were not tagged!")
	}
	<-ctx.Done()
269
	fpt.wait.Wait()
270

271
	if fpt.count() != 0 {
272 273 274
		t.Fatal("Peers were not untagged!")
	}
}