session_test.go 5.37 KB
Newer Older
hannahhoward's avatar
hannahhoward committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
package session

import (
	"context"
	"sync"
	"testing"
	"time"

	"github.com/ipfs/go-block-format"

	"github.com/ipfs/go-bitswap/testutil"
	cid "github.com/ipfs/go-cid"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
	peer "github.com/libp2p/go-libp2p-peer"
)

type wantReq struct {
18 19
	cids  []cid.Cid
	peers []peer.ID
hannahhoward's avatar
hannahhoward committed
20 21 22
}

type fakeWantManager struct {
23 24
	wantReqs   chan wantReq
	cancelReqs chan wantReq
hannahhoward's avatar
hannahhoward committed
25 26 27
}

func (fwm *fakeWantManager) WantBlocks(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
28
	fwm.wantReqs <- wantReq{cids, peers}
hannahhoward's avatar
hannahhoward committed
29 30 31
}

func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
32
	fwm.cancelReqs <- wantReq{cids, peers}
hannahhoward's avatar
hannahhoward committed
33 34 35
}

type fakePeerManager struct {
36
	lk                     sync.RWMutex
hannahhoward's avatar
hannahhoward committed
37
	peers                  []peer.ID
38
	findMorePeersRequested chan struct{}
hannahhoward's avatar
hannahhoward committed
39 40 41
}

func (fpm *fakePeerManager) FindMorePeers(context.Context, cid.Cid) {
42
	fpm.findMorePeersRequested <- struct{}{}
hannahhoward's avatar
hannahhoward committed
43 44 45
}

func (fpm *fakePeerManager) GetOptimizedPeers() []peer.ID {
46 47
	fpm.lk.Lock()
	defer fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
48 49 50 51 52
	return fpm.peers
}

func (fpm *fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
53
	fpm.lk.Lock()
hannahhoward's avatar
hannahhoward committed
54
	fpm.peers = append(fpm.peers, p)
55
	fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
56 57 58 59 60
}

func TestSessionGetBlocks(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()
61 62 63
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
hannahhoward's avatar
hannahhoward committed
64 65 66 67 68 69 70 71 72 73
	fpm := &fakePeerManager{}
	id := testutil.GenerateSessionID()
	session := New(ctx, id, fwm, fpm)
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(activeWantsLimit * 2)
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	getBlocksCh, err := session.GetBlocks(ctx, cids)
74

hannahhoward's avatar
hannahhoward committed
75 76 77 78 79
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// check initial want request
80 81
	receivedWantReq := <-fwm.wantReqs

hannahhoward's avatar
hannahhoward committed
82 83 84 85 86 87 88 89 90
	if len(receivedWantReq.cids) != activeWantsLimit {
		t.Fatal("did not enqueue correct initial number of wants")
	}
	if receivedWantReq.peers != nil {
		t.Fatal("first want request should be a broadcast")
	}

	// now receive the first set of blocks
	peers := testutil.GeneratePeers(activeWantsLimit)
91 92 93
	var newCancelReqs []wantReq
	var newBlockReqs []wantReq
	var receivedBlocks []blocks.Block
hannahhoward's avatar
hannahhoward committed
94
	for i, p := range peers {
95 96 97 98 99 100 101
		session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, receivedWantReq.cids[i])])
		receivedBlock := <-getBlocksCh
		receivedBlocks = append(receivedBlocks, receivedBlock)
		cancelBlock := <-cancelReqs
		newCancelReqs = append(newCancelReqs, cancelBlock)
		wantBlock := <-wantReqs
		newBlockReqs = append(newBlockReqs, wantBlock)
hannahhoward's avatar
hannahhoward committed
102 103 104
	}

	// verify new peers were recorded
105
	fpm.lk.Lock()
hannahhoward's avatar
hannahhoward committed
106 107 108 109 110 111 112 113
	if len(fpm.peers) != activeWantsLimit {
		t.Fatal("received blocks not recorded by the peer manager")
	}
	for _, p := range fpm.peers {
		if !testutil.ContainsPeer(peers, p) {
			t.Fatal("incorrect peer recorded to peer manager")
		}
	}
114
	fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
115 116 117 118

	// look at new interactions with want manager

	// should have cancelled each received block
119
	if len(newCancelReqs) != activeWantsLimit {
hannahhoward's avatar
hannahhoward committed
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
		t.Fatal("did not cancel each block once it was received")
	}
	// new session reqs should be targeted
	totalEnqueued := 0
	for _, w := range newBlockReqs {
		if len(w.peers) == 0 {
			t.Fatal("should not have broadcast again after initial broadcast")
		}
		totalEnqueued += len(w.cids)
	}

	// full new round of cids should be requested
	if totalEnqueued != activeWantsLimit {
		t.Fatal("new blocks were not requested")
	}

	// receive remaining blocks
	for i, p := range peers {
138 139 140 141 142
		session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newBlockReqs[i].cids[0])])
		receivedBlock := <-getBlocksCh
		receivedBlocks = append(receivedBlocks, receivedBlock)
		cancelBlock := <-cancelReqs
		newCancelReqs = append(newCancelReqs, cancelBlock)
hannahhoward's avatar
hannahhoward committed
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
	}

	if len(receivedBlocks) != len(blks) {
		t.Fatal("did not receive enough blocks")
	}
	for _, block := range receivedBlocks {
		if !testutil.ContainsBlock(blks, block) {
			t.Fatal("received incorrect block")
		}
	}
}

func TestSessionFindMorePeers(t *testing.T) {

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()
159 160 161
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
162
	fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{})}
hannahhoward's avatar
hannahhoward committed
163 164
	id := testutil.GenerateSessionID()
	session := New(ctx, id, fwm, fpm)
165
	session.SetBaseTickDelay(200 * time.Microsecond)
hannahhoward's avatar
hannahhoward committed
166 167 168 169 170 171 172 173 174 175 176
	blockGenerator := blocksutil.NewBlockGenerator()
	blks := blockGenerator.Blocks(activeWantsLimit * 2)
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	getBlocksCh, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

177 178 179
	// clear the initial block of wants
	<-wantReqs

hannahhoward's avatar
hannahhoward committed
180
	// receive a block to trigger a tick reset
181
	time.Sleep(200 * time.Microsecond)
hannahhoward's avatar
hannahhoward committed
182 183
	p := testutil.GeneratePeers(1)[0]
	session.ReceiveBlockFrom(p, blks[0])
184 185 186
	<-getBlocksCh
	<-wantReqs
	<-cancelReqs
hannahhoward's avatar
hannahhoward committed
187 188

	// wait long enough for a tick to occur
189
	<-fpm.findMorePeersRequested
hannahhoward's avatar
hannahhoward committed
190 191

	// verify a broadcast was made
192
	receivedWantReq := <-wantReqs
hannahhoward's avatar
hannahhoward committed
193 194 195 196 197 198
	if len(receivedWantReq.cids) != activeWantsLimit {
		t.Fatal("did not rebroadcast whole live list")
	}
	if receivedWantReq.peers != nil {
		t.Fatal("did not make a broadcast")
	}
199
	<-ctx.Done()
hannahhoward's avatar
hannahhoward committed
200
}