session_test.go 10.7 KB
Newer Older
hannahhoward's avatar
hannahhoward committed
1 2 3 4 5 6 7 8
package session

import (
	"context"
	"sync"
	"testing"
	"time"

9
	bssd "github.com/ipfs/go-bitswap/sessiondata"
hannahhoward's avatar
hannahhoward committed
10
	"github.com/ipfs/go-bitswap/testutil"
11
	blocks "github.com/ipfs/go-block-format"
hannahhoward's avatar
hannahhoward committed
12 13
	cid "github.com/ipfs/go-cid"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
14
	delay "github.com/ipfs/go-ipfs-delay"
Raúl Kripalani's avatar
Raúl Kripalani committed
15
	peer "github.com/libp2p/go-libp2p-core/peer"
hannahhoward's avatar
hannahhoward committed
16 17 18
)

type wantReq struct {
19 20
	cids  []cid.Cid
	peers []peer.ID
hannahhoward's avatar
hannahhoward committed
21 22 23
}

type fakeWantManager struct {
24 25
	wantReqs   chan wantReq
	cancelReqs chan wantReq
hannahhoward's avatar
hannahhoward committed
26 27 28
}

func (fwm *fakeWantManager) WantBlocks(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
29 30 31 32
	select {
	case fwm.wantReqs <- wantReq{cids, peers}:
	case <-ctx.Done():
	}
hannahhoward's avatar
hannahhoward committed
33 34 35
}

func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
36 37 38 39
	select {
	case fwm.cancelReqs <- wantReq{cids, peers}:
	case <-ctx.Done():
	}
hannahhoward's avatar
hannahhoward committed
40 41 42
}

type fakePeerManager struct {
43
	lk                     sync.RWMutex
hannahhoward's avatar
hannahhoward committed
44
	peers                  []peer.ID
45
	findMorePeersRequested chan cid.Cid
hannahhoward's avatar
hannahhoward committed
46 47
}

48 49
func (fpm *fakePeerManager) FindMorePeers(ctx context.Context, k cid.Cid) {
	select {
50
	case fpm.findMorePeersRequested <- k:
51 52
	case <-ctx.Done():
	}
hannahhoward's avatar
hannahhoward committed
53 54
}

55
func (fpm *fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer {
56 57
	fpm.lk.Lock()
	defer fpm.lk.Unlock()
58 59 60 61 62
	optimizedPeers := make([]bssd.OptimizedPeer, 0, len(fpm.peers))
	for _, peer := range fpm.peers {
		optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: peer, OptimizationRating: 1.0})
	}
	return optimizedPeers
hannahhoward's avatar
hannahhoward committed
63 64 65 66
}

func (fpm *fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
67
	fpm.lk.Lock()
hannahhoward's avatar
hannahhoward committed
68
	fpm.peers = append(fpm.peers, p)
69
	fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
70
}
71
func (fpm *fakePeerManager) RecordCancel(c cid.Cid) {}
hannahhoward's avatar
hannahhoward committed
72

73 74 75
type fakeRequestSplitter struct {
}

76 77 78 79 80 81
func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, keys []cid.Cid) []bssd.PartialRequest {
	peers := make([]peer.ID, len(optimizedPeers))
	for i, optimizedPeer := range optimizedPeers {
		peers[i] = optimizedPeer.Peer
	}
	return []bssd.PartialRequest{bssd.PartialRequest{Peers: peers, Keys: keys}}
82 83 84 85 86
}

func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock()    {}

hannahhoward's avatar
hannahhoward committed
87 88 89
func TestSessionGetBlocks(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()
90 91 92
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
hannahhoward's avatar
hannahhoward committed
93
	fpm := &fakePeerManager{}
94
	frs := &fakeRequestSplitter{}
hannahhoward's avatar
hannahhoward committed
95
	id := testutil.GenerateSessionID()
96
	session := New(ctx, id, fwm, fpm, frs, time.Second, delay.Fixed(time.Minute))
hannahhoward's avatar
hannahhoward committed
97
	blockGenerator := blocksutil.NewBlockGenerator()
98
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
99 100 101 102 103
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	getBlocksCh, err := session.GetBlocks(ctx, cids)
104

hannahhoward's avatar
hannahhoward committed
105 106 107 108 109
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// check initial want request
110 111
	receivedWantReq := <-fwm.wantReqs

112
	if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
113 114 115 116 117 118 119
		t.Fatal("did not enqueue correct initial number of wants")
	}
	if receivedWantReq.peers != nil {
		t.Fatal("first want request should be a broadcast")
	}

	// now receive the first set of blocks
120
	peers := testutil.GeneratePeers(broadcastLiveWantsLimit)
121 122 123
	var newCancelReqs []wantReq
	var newBlockReqs []wantReq
	var receivedBlocks []blocks.Block
hannahhoward's avatar
hannahhoward committed
124
	for i, p := range peers {
125
		session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, receivedWantReq.cids[i])])
126 127 128 129 130 131 132 133 134 135 136 137 138 139
		select {
		case cancelBlock := <-cancelReqs:
			newCancelReqs = append(newCancelReqs, cancelBlock)
		case <-ctx.Done():
			t.Fatal("did not cancel block want")
		}

		select {
		case receivedBlock := <-getBlocksCh:
			receivedBlocks = append(receivedBlocks, receivedBlock)
		case <-ctx.Done():
			t.Fatal("Did not receive block!")
		}

140 141 142 143 144
		select {
		case wantBlock := <-wantReqs:
			newBlockReqs = append(newBlockReqs, wantBlock)
		default:
		}
hannahhoward's avatar
hannahhoward committed
145 146 147
	}

	// verify new peers were recorded
148
	fpm.lk.Lock()
149
	if len(fpm.peers) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
150 151 152 153 154 155 156
		t.Fatal("received blocks not recorded by the peer manager")
	}
	for _, p := range fpm.peers {
		if !testutil.ContainsPeer(peers, p) {
			t.Fatal("incorrect peer recorded to peer manager")
		}
	}
157
	fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
158 159 160 161

	// look at new interactions with want manager

	// should have cancelled each received block
162
	if len(newCancelReqs) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
163 164 165
		t.Fatal("did not cancel each block once it was received")
	}
	// new session reqs should be targeted
166
	var newCidsRequested []cid.Cid
hannahhoward's avatar
hannahhoward committed
167 168 169 170
	for _, w := range newBlockReqs {
		if len(w.peers) == 0 {
			t.Fatal("should not have broadcast again after initial broadcast")
		}
171
		newCidsRequested = append(newCidsRequested, w.cids...)
hannahhoward's avatar
hannahhoward committed
172 173 174
	}

	// full new round of cids should be requested
175
	if len(newCidsRequested) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
176 177 178 179 180
		t.Fatal("new blocks were not requested")
	}

	// receive remaining blocks
	for i, p := range peers {
181
		session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newCidsRequested[i])])
182 183 184 185
		receivedBlock := <-getBlocksCh
		receivedBlocks = append(receivedBlocks, receivedBlock)
		cancelBlock := <-cancelReqs
		newCancelReqs = append(newCancelReqs, cancelBlock)
hannahhoward's avatar
hannahhoward committed
186 187 188 189 190 191 192 193 194 195 196 197 198 199
	}

	if len(receivedBlocks) != len(blks) {
		t.Fatal("did not receive enough blocks")
	}
	for _, block := range receivedBlocks {
		if !testutil.ContainsBlock(blks, block) {
			t.Fatal("received incorrect block")
		}
	}
}

func TestSessionFindMorePeers(t *testing.T) {

200
	ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond)
hannahhoward's avatar
hannahhoward committed
201
	defer cancel()
202 203 204
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
205
	fpm := &fakePeerManager{findMorePeersRequested: make(chan cid.Cid, 1)}
206
	frs := &fakeRequestSplitter{}
hannahhoward's avatar
hannahhoward committed
207
	id := testutil.GenerateSessionID()
208
	session := New(ctx, id, fwm, fpm, frs, time.Second, delay.Fixed(time.Minute))
209
	session.SetBaseTickDelay(200 * time.Microsecond)
hannahhoward's avatar
hannahhoward committed
210
	blockGenerator := blocksutil.NewBlockGenerator()
211
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
212 213 214 215 216 217 218 219 220
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	getBlocksCh, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

221
	// clear the initial block of wants
222 223 224 225 226
	select {
	case <-wantReqs:
	case <-ctx.Done():
		t.Fatal("Did not make first want request ")
	}
227

hannahhoward's avatar
hannahhoward committed
228
	// receive a block to trigger a tick reset
229 230 231
	time.Sleep(20 * time.Millisecond) // need to make sure some latency registers
	// or there will be no tick set -- time precision on Windows in go is in the
	// millisecond range
hannahhoward's avatar
hannahhoward committed
232 233
	p := testutil.GeneratePeers(1)[0]
	session.ReceiveBlockFrom(p, blks[0])
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
	select {
	case <-cancelReqs:
	case <-ctx.Done():
		t.Fatal("Did not cancel block")
	}
	select {
	case <-getBlocksCh:
	case <-ctx.Done():
		t.Fatal("Did not get block")
	}
	select {
	case <-wantReqs:
	case <-ctx.Done():
		t.Fatal("Did not make second want request ")
	}
hannahhoward's avatar
hannahhoward committed
249 250

	// verify a broadcast was made
251 252 253 254 255 256 257 258 259 260
	select {
	case receivedWantReq := <-wantReqs:
		if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
			t.Fatal("did not rebroadcast whole live list")
		}
		if receivedWantReq.peers != nil {
			t.Fatal("did not make a broadcast")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
hannahhoward's avatar
hannahhoward committed
261
	}
262 263 264 265 266 267

	// wait for a request to get more peers to occur
	select {
	case <-fpm.findMorePeersRequested:
	case <-ctx.Done():
		t.Fatal("Did not find more peers")
hannahhoward's avatar
hannahhoward committed
268 269
	}
}
270 271

func TestSessionFailingToGetFirstBlock(t *testing.T) {
272
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
273 274 275 276 277 278 279
	defer cancel()
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
	fpm := &fakePeerManager{findMorePeersRequested: make(chan cid.Cid, 1)}
	frs := &fakeRequestSplitter{}
	id := testutil.GenerateSessionID()
280

281
	session := New(ctx, id, fwm, fpm, frs, 10*time.Millisecond, delay.Fixed(100*time.Millisecond))
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(4)
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	startTick := time.Now()
	_, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// clear the initial block of wants
	select {
	case <-wantReqs:
	case <-ctx.Done():
		t.Fatal("Did not make first want request ")
	}

	// verify a broadcast is made
	select {
	case receivedWantReq := <-wantReqs:
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
		if receivedWantReq.peers != nil {
			t.Fatal("did not make a broadcast")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}

	// wait for a request to get more peers to occur
	select {
	case k := <-fpm.findMorePeersRequested:
		if testutil.IndexOf(blks, k) == -1 {
			t.Fatal("did not rebroadcast an active want")
		}
	case <-ctx.Done():
		t.Fatal("Did not find more peers")
	}
	firstTickLength := time.Since(startTick)

	// wait for another broadcast to occur
	select {
	case receivedWantReq := <-wantReqs:
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
		if receivedWantReq.peers != nil {
			t.Fatal("did not make a broadcast")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
	startTick = time.Now()
	// wait for another broadcast to occur
	select {
	case receivedWantReq := <-wantReqs:
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
		if receivedWantReq.peers != nil {
			t.Fatal("did not make a broadcast")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
	consecutiveTickLength := time.Since(startTick)
	// tick should take longer
	if firstTickLength > consecutiveTickLength {
		t.Fatal("Should have increased tick length after first consecutive tick")
	}
	startTick = time.Now()
	// wait for another broadcast to occur
	select {
	case receivedWantReq := <-wantReqs:
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
		if receivedWantReq.peers != nil {
			t.Fatal("did not make a broadcast")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
	secondConsecutiveTickLength := time.Since(startTick)
	// tick should take longer
	if consecutiveTickLength > secondConsecutiveTickLength {
		t.Fatal("Should have increased tick length after first consecutive tick")
	}

	// should not have looked for peers on consecutive ticks
	select {
	case <-fpm.findMorePeersRequested:
		t.Fatal("Should not have looked for peers on consecutive tick")
	default:
	}

	// wait for rebroadcast to occur
	select {
	case k := <-fpm.findMorePeersRequested:
		if testutil.IndexOf(blks, k) == -1 {
			t.Fatal("did not rebroadcast an active want")
		}
	case <-ctx.Done():
		t.Fatal("Did not rebroadcast to find more peers")
	}
}