session_test.go 6.85 KB
Newer Older
hannahhoward's avatar
hannahhoward committed
1 2 3 4 5 6 7 8 9 10
package session

import (
	"context"
	"sync"
	"testing"
	"time"

	"github.com/ipfs/go-block-format"

11
	bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
hannahhoward's avatar
hannahhoward committed
12 13 14
	"github.com/ipfs/go-bitswap/testutil"
	cid "github.com/ipfs/go-cid"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
Raúl Kripalani's avatar
Raúl Kripalani committed
15
	peer "github.com/libp2p/go-libp2p-core/peer"
hannahhoward's avatar
hannahhoward committed
16 17 18
)

type wantReq struct {
19 20
	cids  []cid.Cid
	peers []peer.ID
hannahhoward's avatar
hannahhoward committed
21 22 23
}

type fakeWantManager struct {
24 25
	wantReqs   chan wantReq
	cancelReqs chan wantReq
hannahhoward's avatar
hannahhoward committed
26 27 28
}

func (fwm *fakeWantManager) WantBlocks(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
29 30 31 32
	select {
	case fwm.wantReqs <- wantReq{cids, peers}:
	case <-ctx.Done():
	}
hannahhoward's avatar
hannahhoward committed
33 34 35
}

func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
36 37 38 39
	select {
	case fwm.cancelReqs <- wantReq{cids, peers}:
	case <-ctx.Done():
	}
hannahhoward's avatar
hannahhoward committed
40 41 42
}

type fakePeerManager struct {
43
	lk                     sync.RWMutex
hannahhoward's avatar
hannahhoward committed
44
	peers                  []peer.ID
45
	findMorePeersRequested chan struct{}
hannahhoward's avatar
hannahhoward committed
46 47
}

48 49 50 51 52
func (fpm *fakePeerManager) FindMorePeers(ctx context.Context, k cid.Cid) {
	select {
	case fpm.findMorePeersRequested <- struct{}{}:
	case <-ctx.Done():
	}
hannahhoward's avatar
hannahhoward committed
53 54 55
}

func (fpm *fakePeerManager) GetOptimizedPeers() []peer.ID {
56 57
	fpm.lk.Lock()
	defer fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
58 59 60 61 62
	return fpm.peers
}

func (fpm *fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
63
	fpm.lk.Lock()
hannahhoward's avatar
hannahhoward committed
64
	fpm.peers = append(fpm.peers, p)
65
	fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
66 67
}

68 69 70 71 72 73 74 75 76 77
type fakeRequestSplitter struct {
}

func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
	return []*bssrs.PartialRequest{&bssrs.PartialRequest{Peers: peers, Keys: keys}}
}

func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock()    {}

hannahhoward's avatar
hannahhoward committed
78 79 80
func TestSessionGetBlocks(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()
81 82 83
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
hannahhoward's avatar
hannahhoward committed
84
	fpm := &fakePeerManager{}
85
	frs := &fakeRequestSplitter{}
hannahhoward's avatar
hannahhoward committed
86
	id := testutil.GenerateSessionID()
87
	session := New(ctx, id, fwm, fpm, frs)
hannahhoward's avatar
hannahhoward committed
88
	blockGenerator := blocksutil.NewBlockGenerator()
89
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
90 91 92 93 94
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	getBlocksCh, err := session.GetBlocks(ctx, cids)
95

hannahhoward's avatar
hannahhoward committed
96 97 98 99 100
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// check initial want request
101 102
	receivedWantReq := <-fwm.wantReqs

103
	if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
104 105 106 107 108 109 110
		t.Fatal("did not enqueue correct initial number of wants")
	}
	if receivedWantReq.peers != nil {
		t.Fatal("first want request should be a broadcast")
	}

	// now receive the first set of blocks
111
	peers := testutil.GeneratePeers(broadcastLiveWantsLimit)
112 113 114
	var newCancelReqs []wantReq
	var newBlockReqs []wantReq
	var receivedBlocks []blocks.Block
hannahhoward's avatar
hannahhoward committed
115
	for i, p := range peers {
116
		session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, receivedWantReq.cids[i])])
117 118 119 120 121 122 123 124 125 126 127 128 129 130
		select {
		case cancelBlock := <-cancelReqs:
			newCancelReqs = append(newCancelReqs, cancelBlock)
		case <-ctx.Done():
			t.Fatal("did not cancel block want")
		}

		select {
		case receivedBlock := <-getBlocksCh:
			receivedBlocks = append(receivedBlocks, receivedBlock)
		case <-ctx.Done():
			t.Fatal("Did not receive block!")
		}

131 132 133 134 135
		select {
		case wantBlock := <-wantReqs:
			newBlockReqs = append(newBlockReqs, wantBlock)
		default:
		}
hannahhoward's avatar
hannahhoward committed
136 137 138
	}

	// verify new peers were recorded
139
	fpm.lk.Lock()
140
	if len(fpm.peers) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
141 142 143 144 145 146 147
		t.Fatal("received blocks not recorded by the peer manager")
	}
	for _, p := range fpm.peers {
		if !testutil.ContainsPeer(peers, p) {
			t.Fatal("incorrect peer recorded to peer manager")
		}
	}
148
	fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
149 150 151 152

	// look at new interactions with want manager

	// should have cancelled each received block
153
	if len(newCancelReqs) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
154 155 156
		t.Fatal("did not cancel each block once it was received")
	}
	// new session reqs should be targeted
157
	var newCidsRequested []cid.Cid
hannahhoward's avatar
hannahhoward committed
158 159 160 161
	for _, w := range newBlockReqs {
		if len(w.peers) == 0 {
			t.Fatal("should not have broadcast again after initial broadcast")
		}
162
		newCidsRequested = append(newCidsRequested, w.cids...)
hannahhoward's avatar
hannahhoward committed
163 164 165
	}

	// full new round of cids should be requested
166
	if len(newCidsRequested) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
167 168 169 170 171
		t.Fatal("new blocks were not requested")
	}

	// receive remaining blocks
	for i, p := range peers {
172
		session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newCidsRequested[i])])
173 174 175 176
		receivedBlock := <-getBlocksCh
		receivedBlocks = append(receivedBlocks, receivedBlock)
		cancelBlock := <-cancelReqs
		newCancelReqs = append(newCancelReqs, cancelBlock)
hannahhoward's avatar
hannahhoward committed
177 178 179 180 181 182 183 184 185 186 187 188 189 190
	}

	if len(receivedBlocks) != len(blks) {
		t.Fatal("did not receive enough blocks")
	}
	for _, block := range receivedBlocks {
		if !testutil.ContainsBlock(blks, block) {
			t.Fatal("received incorrect block")
		}
	}
}

func TestSessionFindMorePeers(t *testing.T) {

191
	ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond)
hannahhoward's avatar
hannahhoward committed
192
	defer cancel()
193 194 195
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
196
	fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{}, 1)}
197
	frs := &fakeRequestSplitter{}
hannahhoward's avatar
hannahhoward committed
198
	id := testutil.GenerateSessionID()
199
	session := New(ctx, id, fwm, fpm, frs)
200
	session.SetBaseTickDelay(200 * time.Microsecond)
hannahhoward's avatar
hannahhoward committed
201
	blockGenerator := blocksutil.NewBlockGenerator()
202
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
203 204 205 206 207 208 209 210 211
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	getBlocksCh, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

212
	// clear the initial block of wants
213 214 215 216 217
	select {
	case <-wantReqs:
	case <-ctx.Done():
		t.Fatal("Did not make first want request ")
	}
218

hannahhoward's avatar
hannahhoward committed
219
	// receive a block to trigger a tick reset
220 221 222
	time.Sleep(20 * time.Millisecond) // need to make sure some latency registers
	// or there will be no tick set -- time precision on Windows in go is in the
	// millisecond range
hannahhoward's avatar
hannahhoward committed
223 224
	p := testutil.GeneratePeers(1)[0]
	session.ReceiveBlockFrom(p, blks[0])
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
	select {
	case <-cancelReqs:
	case <-ctx.Done():
		t.Fatal("Did not cancel block")
	}
	select {
	case <-getBlocksCh:
	case <-ctx.Done():
		t.Fatal("Did not get block")
	}
	select {
	case <-wantReqs:
	case <-ctx.Done():
		t.Fatal("Did not make second want request ")
	}
hannahhoward's avatar
hannahhoward committed
240 241

	// verify a broadcast was made
242 243 244 245 246 247 248 249 250 251
	select {
	case receivedWantReq := <-wantReqs:
		if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
			t.Fatal("did not rebroadcast whole live list")
		}
		if receivedWantReq.peers != nil {
			t.Fatal("did not make a broadcast")
		}
	case <-ctx.Done():
		t.Fatal("Never rebroadcast want list")
hannahhoward's avatar
hannahhoward committed
252
	}
253 254 255 256 257 258

	// wait for a request to get more peers to occur
	select {
	case <-fpm.findMorePeersRequested:
	case <-ctx.Done():
		t.Fatal("Did not find more peers")
hannahhoward's avatar
hannahhoward committed
259 260
	}
}