session_test.go 5.97 KB
Newer Older
hannahhoward's avatar
hannahhoward committed
1 2 3 4 5 6 7 8 9 10
package session

import (
	"context"
	"sync"
	"testing"
	"time"

	"github.com/ipfs/go-block-format"

11
	bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
hannahhoward's avatar
hannahhoward committed
12 13 14 15 16 17 18
	"github.com/ipfs/go-bitswap/testutil"
	cid "github.com/ipfs/go-cid"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
	peer "github.com/libp2p/go-libp2p-peer"
)

type wantReq struct {
19 20
	cids  []cid.Cid
	peers []peer.ID
hannahhoward's avatar
hannahhoward committed
21 22 23
}

type fakeWantManager struct {
24 25
	wantReqs   chan wantReq
	cancelReqs chan wantReq
hannahhoward's avatar
hannahhoward committed
26 27 28
}

func (fwm *fakeWantManager) WantBlocks(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
29
	fwm.wantReqs <- wantReq{cids, peers}
hannahhoward's avatar
hannahhoward committed
30 31 32
}

func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
33
	fwm.cancelReqs <- wantReq{cids, peers}
hannahhoward's avatar
hannahhoward committed
34 35 36
}

type fakePeerManager struct {
37
	lk                     sync.RWMutex
hannahhoward's avatar
hannahhoward committed
38
	peers                  []peer.ID
39
	findMorePeersRequested chan struct{}
hannahhoward's avatar
hannahhoward committed
40 41 42
}

func (fpm *fakePeerManager) FindMorePeers(context.Context, cid.Cid) {
43
	fpm.findMorePeersRequested <- struct{}{}
hannahhoward's avatar
hannahhoward committed
44 45 46
}

func (fpm *fakePeerManager) GetOptimizedPeers() []peer.ID {
47 48
	fpm.lk.Lock()
	defer fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
49 50 51 52 53
	return fpm.peers
}

func (fpm *fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
54
	fpm.lk.Lock()
hannahhoward's avatar
hannahhoward committed
55
	fpm.peers = append(fpm.peers, p)
56
	fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
57 58
}

59 60 61 62 63 64 65 66 67 68
type fakeRequestSplitter struct {
}

func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
	return []*bssrs.PartialRequest{&bssrs.PartialRequest{Peers: peers, Keys: keys}}
}

func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock()    {}

hannahhoward's avatar
hannahhoward committed
69 70 71
func TestSessionGetBlocks(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()
72 73 74
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
hannahhoward's avatar
hannahhoward committed
75
	fpm := &fakePeerManager{}
76
	frs := &fakeRequestSplitter{}
hannahhoward's avatar
hannahhoward committed
77
	id := testutil.GenerateSessionID()
78
	session := New(ctx, id, fwm, fpm, frs)
hannahhoward's avatar
hannahhoward committed
79
	blockGenerator := blocksutil.NewBlockGenerator()
80
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
81 82 83 84 85
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	getBlocksCh, err := session.GetBlocks(ctx, cids)
86

hannahhoward's avatar
hannahhoward committed
87 88 89 90 91
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// check initial want request
92 93
	receivedWantReq := <-fwm.wantReqs

94
	if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
95 96 97 98 99 100 101
		t.Fatal("did not enqueue correct initial number of wants")
	}
	if receivedWantReq.peers != nil {
		t.Fatal("first want request should be a broadcast")
	}

	// now receive the first set of blocks
102
	peers := testutil.GeneratePeers(broadcastLiveWantsLimit)
103 104 105
	var newCancelReqs []wantReq
	var newBlockReqs []wantReq
	var receivedBlocks []blocks.Block
hannahhoward's avatar
hannahhoward committed
106
	for i, p := range peers {
107 108 109 110 111
		session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, receivedWantReq.cids[i])])
		receivedBlock := <-getBlocksCh
		receivedBlocks = append(receivedBlocks, receivedBlock)
		cancelBlock := <-cancelReqs
		newCancelReqs = append(newCancelReqs, cancelBlock)
112 113 114 115 116
		select {
		case wantBlock := <-wantReqs:
			newBlockReqs = append(newBlockReqs, wantBlock)
		default:
		}
hannahhoward's avatar
hannahhoward committed
117 118 119
	}

	// verify new peers were recorded
120
	fpm.lk.Lock()
121
	if len(fpm.peers) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
122 123 124 125 126 127 128
		t.Fatal("received blocks not recorded by the peer manager")
	}
	for _, p := range fpm.peers {
		if !testutil.ContainsPeer(peers, p) {
			t.Fatal("incorrect peer recorded to peer manager")
		}
	}
129
	fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
130 131 132 133

	// look at new interactions with want manager

	// should have cancelled each received block
134
	if len(newCancelReqs) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
135 136 137
		t.Fatal("did not cancel each block once it was received")
	}
	// new session reqs should be targeted
138
	var newCidsRequested []cid.Cid
hannahhoward's avatar
hannahhoward committed
139 140 141 142
	for _, w := range newBlockReqs {
		if len(w.peers) == 0 {
			t.Fatal("should not have broadcast again after initial broadcast")
		}
143
		newCidsRequested = append(newCidsRequested, w.cids...)
hannahhoward's avatar
hannahhoward committed
144 145 146
	}

	// full new round of cids should be requested
147
	if len(newCidsRequested) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
148 149 150 151 152
		t.Fatal("new blocks were not requested")
	}

	// receive remaining blocks
	for i, p := range peers {
153
		session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newCidsRequested[i])])
154 155 156 157
		receivedBlock := <-getBlocksCh
		receivedBlocks = append(receivedBlocks, receivedBlock)
		cancelBlock := <-cancelReqs
		newCancelReqs = append(newCancelReqs, cancelBlock)
hannahhoward's avatar
hannahhoward committed
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
	}

	if len(receivedBlocks) != len(blks) {
		t.Fatal("did not receive enough blocks")
	}
	for _, block := range receivedBlocks {
		if !testutil.ContainsBlock(blks, block) {
			t.Fatal("received incorrect block")
		}
	}
}

func TestSessionFindMorePeers(t *testing.T) {

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()
174 175 176
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
177
	fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{}, 1)}
178
	frs := &fakeRequestSplitter{}
hannahhoward's avatar
hannahhoward committed
179
	id := testutil.GenerateSessionID()
180
	session := New(ctx, id, fwm, fpm, frs)
181
	session.SetBaseTickDelay(200 * time.Microsecond)
hannahhoward's avatar
hannahhoward committed
182
	blockGenerator := blocksutil.NewBlockGenerator()
183
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
184 185 186 187 188 189 190 191 192
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	getBlocksCh, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

193 194 195
	// clear the initial block of wants
	<-wantReqs

hannahhoward's avatar
hannahhoward committed
196
	// receive a block to trigger a tick reset
197
	time.Sleep(200 * time.Microsecond)
hannahhoward's avatar
hannahhoward committed
198 199
	p := testutil.GeneratePeers(1)[0]
	session.ReceiveBlockFrom(p, blks[0])
200 201 202
	<-getBlocksCh
	<-wantReqs
	<-cancelReqs
hannahhoward's avatar
hannahhoward committed
203

204
	// wait for a request to get more peers to occur
205
	<-fpm.findMorePeersRequested
hannahhoward's avatar
hannahhoward committed
206 207

	// verify a broadcast was made
208
	receivedWantReq := <-wantReqs
209
	if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
210 211 212 213 214
		t.Fatal("did not rebroadcast whole live list")
	}
	if receivedWantReq.peers != nil {
		t.Fatal("did not make a broadcast")
	}
215
	<-ctx.Done()
hannahhoward's avatar
hannahhoward committed
216
}