session_test.go 10.4 KB
Newer Older
hannahhoward's avatar
hannahhoward committed
1 2 3 4 5 6 7 8
package session

import (
	"context"
	"sync"
	"testing"
	"time"

9
	bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
hannahhoward's avatar
hannahhoward committed
10
	"github.com/ipfs/go-bitswap/testutil"
11
	blocks "github.com/ipfs/go-block-format"
hannahhoward's avatar
hannahhoward committed
12 13
	cid "github.com/ipfs/go-cid"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
14
	delay "github.com/ipfs/go-ipfs-delay"
Raúl Kripalani's avatar
Raúl Kripalani committed
15
	peer "github.com/libp2p/go-libp2p-core/peer"
hannahhoward's avatar
hannahhoward committed
16 17 18
)

type wantReq struct {
19 20
	cids  []cid.Cid
	peers []peer.ID
hannahhoward's avatar
hannahhoward committed
21 22 23
}

type fakeWantManager struct {
24 25
	wantReqs   chan wantReq
	cancelReqs chan wantReq
hannahhoward's avatar
hannahhoward committed
26 27 28
}

func (fwm *fakeWantManager) WantBlocks(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
29 30 31 32
	select {
	case fwm.wantReqs <- wantReq{cids, peers}:
	case <-ctx.Done():
	}
hannahhoward's avatar
hannahhoward committed
33 34 35
}

func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
36 37 38 39
	select {
	case fwm.cancelReqs <- wantReq{cids, peers}:
	case <-ctx.Done():
	}
hannahhoward's avatar
hannahhoward committed
40 41 42
}

type fakePeerManager struct {
43
	lk                     sync.RWMutex
hannahhoward's avatar
hannahhoward committed
44
	peers                  []peer.ID
45
	findMorePeersRequested chan cid.Cid
hannahhoward's avatar
hannahhoward committed
46 47
}

48 49
func (fpm *fakePeerManager) FindMorePeers(ctx context.Context, k cid.Cid) {
	select {
50
	case fpm.findMorePeersRequested <- k:
51 52
	case <-ctx.Done():
	}
hannahhoward's avatar
hannahhoward committed
53 54 55
}

func (fpm *fakePeerManager) GetOptimizedPeers() []peer.ID {
56 57
	fpm.lk.Lock()
	defer fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
58 59 60 61 62
	return fpm.peers
}

func (fpm *fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
63
	fpm.lk.Lock()
hannahhoward's avatar
hannahhoward committed
64
	fpm.peers = append(fpm.peers, p)
65
	fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
66 67
}

68 69 70 71 72 73 74 75 76 77
type fakeRequestSplitter struct {
}

func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
	return []*bssrs.PartialRequest{&bssrs.PartialRequest{Peers: peers, Keys: keys}}
}

func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock()    {}

hannahhoward's avatar
hannahhoward committed
78 79 80
func TestSessionGetBlocks(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()
81 82 83
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
hannahhoward's avatar
hannahhoward committed
84
	fpm := &fakePeerManager{}
85
	frs := &fakeRequestSplitter{}
hannahhoward's avatar
hannahhoward committed
86
	id := testutil.GenerateSessionID()
87
	session := New(ctx, id, fwm, fpm, frs)
hannahhoward's avatar
hannahhoward committed
88
	blockGenerator := blocksutil.NewBlockGenerator()
89
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
90 91 92 93 94
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	getBlocksCh, err := session.GetBlocks(ctx, cids)
95

hannahhoward's avatar
hannahhoward committed
96 97 98 99 100
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// check initial want request
101 102
	receivedWantReq := <-fwm.wantReqs

103
	if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
104 105 106 107 108 109 110
		t.Fatal("did not enqueue correct initial number of wants")
	}
	if receivedWantReq.peers != nil {
		t.Fatal("first want request should be a broadcast")
	}

	// now receive the first set of blocks
111
	peers := testutil.GeneratePeers(broadcastLiveWantsLimit)
112 113 114
	var newCancelReqs []wantReq
	var newBlockReqs []wantReq
	var receivedBlocks []blocks.Block
hannahhoward's avatar
hannahhoward committed
115
	for i, p := range peers {
116
		session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, receivedWantReq.cids[i])])
117 118 119 120 121 122 123 124 125 126 127 128 129 130
		select {
		case cancelBlock := <-cancelReqs:
			newCancelReqs = append(newCancelReqs, cancelBlock)
		case <-ctx.Done():
			t.Fatal("did not cancel block want")
		}

		select {
		case receivedBlock := <-getBlocksCh:
			receivedBlocks = append(receivedBlocks, receivedBlock)
		case <-ctx.Done():
			t.Fatal("Did not receive block!")
		}

131 132 133 134 135
		select {
		case wantBlock := <-wantReqs:
			newBlockReqs = append(newBlockReqs, wantBlock)
		default:
		}
hannahhoward's avatar
hannahhoward committed
136 137 138
	}

	// verify new peers were recorded
139
	fpm.lk.Lock()
140
	if len(fpm.peers) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
141 142 143 144 145 146 147
		t.Fatal("received blocks not recorded by the peer manager")
	}
	for _, p := range fpm.peers {
		if !testutil.ContainsPeer(peers, p) {
			t.Fatal("incorrect peer recorded to peer manager")
		}
	}
148
	fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
149 150 151 152

	// look at new interactions with want manager

	// should have cancelled each received block
153
	if len(newCancelReqs) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
154 155 156
		t.Fatal("did not cancel each block once it was received")
	}
	// new session reqs should be targeted
157
	var newCidsRequested []cid.Cid
hannahhoward's avatar
hannahhoward committed
158 159 160 161
	for _, w := range newBlockReqs {
		if len(w.peers) == 0 {
			t.Fatal("should not have broadcast again after initial broadcast")
		}
162
		newCidsRequested = append(newCidsRequested, w.cids...)
hannahhoward's avatar
hannahhoward committed
163 164 165
	}

	// full new round of cids should be requested
166
	if len(newCidsRequested) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
167 168 169 170 171
		t.Fatal("new blocks were not requested")
	}

	// receive remaining blocks
	for i, p := range peers {
172
		session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newCidsRequested[i])])
173 174 175 176
		receivedBlock := <-getBlocksCh
		receivedBlocks = append(receivedBlocks, receivedBlock)
		cancelBlock := <-cancelReqs
		newCancelReqs = append(newCancelReqs, cancelBlock)
hannahhoward's avatar
hannahhoward committed
177 178 179 180 181 182 183 184 185 186 187 188 189 190
	}

	if len(receivedBlocks) != len(blks) {
		t.Fatal("did not receive enough blocks")
	}
	for _, block := range receivedBlocks {
		if !testutil.ContainsBlock(blks, block) {
			t.Fatal("received incorrect block")
		}
	}
}

func TestSessionFindMorePeers(t *testing.T) {

191
	ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond)
hannahhoward's avatar
hannahhoward committed
192
	defer cancel()
193 194 195
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
196
	fpm := &fakePeerManager{findMorePeersRequested: make(chan cid.Cid, 1)}
197
	frs := &fakeRequestSplitter{}
hannahhoward's avatar
hannahhoward committed
198
	id := testutil.GenerateSessionID()
199
	session := New(ctx, id, fwm, fpm, frs)
200
	session.SetBaseTickDelay(200 * time.Microsecond)
hannahhoward's avatar
hannahhoward committed
201
	blockGenerator := blocksutil.NewBlockGenerator()
202
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
203 204 205 206 207 208 209 210 211
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	getBlocksCh, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

212
	// clear the initial block of wants
213 214 215 216 217
	select {
	case <-wantReqs:
	case <-ctx.Done():
		t.Fatal("Did not make first want request ")
	}
218

hannahhoward's avatar
hannahhoward committed
219
	// receive a block to trigger a tick reset
220 221 222
	time.Sleep(20 * time.Millisecond) // need to make sure some latency registers
	// or there will be no tick set -- time precision on Windows in go is in the
	// millisecond range
hannahhoward's avatar
hannahhoward committed
223 224
	p := testutil.GeneratePeers(1)[0]
	session.ReceiveBlockFrom(p, blks[0])
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
	select {
	case <-cancelReqs:
	case <-ctx.Done():
		t.Fatal("Did not cancel block")
	}
	select {
	case <-getBlocksCh:
	case <-ctx.Done():
		t.Fatal("Did not get block")
	}
	select {
	case <-wantReqs:
	case <-ctx.Done():
		t.Fatal("Did not make second want request ")
	}
hannahhoward's avatar
hannahhoward committed
240 241

	// verify a broadcast was made
242 243 244 245 246 247 248 249 250 251
	select {
	case receivedWantReq := <-wantReqs:
		if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
			t.Fatal("did not rebroadcast whole live list")
		}
		if receivedWantReq.peers != nil {
			t.Fatal("did not make a broadcast")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
hannahhoward's avatar
hannahhoward committed
252
	}
253 254 255 256 257 258

	// wait for a request to get more peers to occur
	select {
	case <-fpm.findMorePeersRequested:
	case <-ctx.Done():
		t.Fatal("Did not find more peers")
hannahhoward's avatar
hannahhoward committed
259 260
	}
}
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385

func TestSessionFailingToGetFirstBlock(t *testing.T) {

	ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond)
	defer cancel()
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
	fpm := &fakePeerManager{findMorePeersRequested: make(chan cid.Cid, 1)}
	frs := &fakeRequestSplitter{}
	id := testutil.GenerateSessionID()
	SetProviderSearchDelay(10 * time.Millisecond)
	defer SetProviderSearchDelay(1 * time.Second)
	SetRebroadcastDelay(delay.Fixed(100 * time.Millisecond))
	defer SetRebroadcastDelay(delay.Fixed(1 * time.Minute))
	session := New(ctx, id, fwm, fpm, frs)
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(4)
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	startTick := time.Now()
	_, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// clear the initial block of wants
	select {
	case <-wantReqs:
	case <-ctx.Done():
		t.Fatal("Did not make first want request ")
	}

	// verify a broadcast is made
	select {
	case receivedWantReq := <-wantReqs:
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
		if receivedWantReq.peers != nil {
			t.Fatal("did not make a broadcast")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}

	// wait for a request to get more peers to occur
	select {
	case k := <-fpm.findMorePeersRequested:
		if testutil.IndexOf(blks, k) == -1 {
			t.Fatal("did not rebroadcast an active want")
		}
	case <-ctx.Done():
		t.Fatal("Did not find more peers")
	}
	firstTickLength := time.Since(startTick)

	// wait for another broadcast to occur
	select {
	case receivedWantReq := <-wantReqs:
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
		if receivedWantReq.peers != nil {
			t.Fatal("did not make a broadcast")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
	startTick = time.Now()
	// wait for another broadcast to occur
	select {
	case receivedWantReq := <-wantReqs:
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
		if receivedWantReq.peers != nil {
			t.Fatal("did not make a broadcast")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
	consecutiveTickLength := time.Since(startTick)
	// tick should take longer
	if firstTickLength > consecutiveTickLength {
		t.Fatal("Should have increased tick length after first consecutive tick")
	}
	startTick = time.Now()
	// wait for another broadcast to occur
	select {
	case receivedWantReq := <-wantReqs:
		if len(receivedWantReq.cids) < len(cids) {
			t.Fatal("did not rebroadcast whole live list")
		}
		if receivedWantReq.peers != nil {
			t.Fatal("did not make a broadcast")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
	}
	secondConsecutiveTickLength := time.Since(startTick)
	// tick should take longer
	if consecutiveTickLength > secondConsecutiveTickLength {
		t.Fatal("Should have increased tick length after first consecutive tick")
	}

	// should not have looked for peers on consecutive ticks
	select {
	case <-fpm.findMorePeersRequested:
		t.Fatal("Should not have looked for peers on consecutive tick")
	default:
	}

	// wait for rebroadcast to occur
	select {
	case k := <-fpm.findMorePeersRequested:
		if testutil.IndexOf(blks, k) == -1 {
			t.Fatal("did not rebroadcast an active want")
		}
	case <-ctx.Done():
		t.Fatal("Did not rebroadcast to find more peers")
	}
}