session_test.go 5.5 KB
Newer Older
hannahhoward's avatar
hannahhoward committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
package session

import (
	"context"
	"sync"
	"testing"
	"time"

	"github.com/ipfs/go-block-format"

	"github.com/ipfs/go-bitswap/testutil"
	cid "github.com/ipfs/go-cid"
	blocksutil "github.com/ipfs/go-ipfs-blocksutil"
	peer "github.com/libp2p/go-libp2p-peer"
)

type wantReq struct {
18 19
	cids  []cid.Cid
	peers []peer.ID
hannahhoward's avatar
hannahhoward committed
20 21 22
}

type fakeWantManager struct {
23 24
	wantReqs   chan wantReq
	cancelReqs chan wantReq
hannahhoward's avatar
hannahhoward committed
25 26 27
}

func (fwm *fakeWantManager) WantBlocks(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
28
	fwm.wantReqs <- wantReq{cids, peers}
hannahhoward's avatar
hannahhoward committed
29 30 31
}

func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
32
	fwm.cancelReqs <- wantReq{cids, peers}
hannahhoward's avatar
hannahhoward committed
33 34 35
}

type fakePeerManager struct {
36
	lk                     sync.RWMutex
hannahhoward's avatar
hannahhoward committed
37
	peers                  []peer.ID
38
	findMorePeersRequested chan struct{}
hannahhoward's avatar
hannahhoward committed
39 40 41
}

func (fpm *fakePeerManager) FindMorePeers(context.Context, cid.Cid) {
42
	fpm.findMorePeersRequested <- struct{}{}
hannahhoward's avatar
hannahhoward committed
43 44 45
}

func (fpm *fakePeerManager) GetOptimizedPeers() []peer.ID {
46 47
	fpm.lk.Lock()
	defer fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
48 49 50 51 52
	return fpm.peers
}

func (fpm *fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
53
	fpm.lk.Lock()
hannahhoward's avatar
hannahhoward committed
54
	fpm.peers = append(fpm.peers, p)
55
	fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
56 57 58 59 60
}

func TestSessionGetBlocks(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()
61 62 63
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
hannahhoward's avatar
hannahhoward committed
64 65 66 67
	fpm := &fakePeerManager{}
	id := testutil.GenerateSessionID()
	session := New(ctx, id, fwm, fpm)
	blockGenerator := blocksutil.NewBlockGenerator()
68
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
69 70 71 72 73
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	getBlocksCh, err := session.GetBlocks(ctx, cids)
74

hannahhoward's avatar
hannahhoward committed
75 76 77 78 79
	if err != nil {
		t.Fatal("error getting blocks")
	}

	// check initial want request
80 81
	receivedWantReq := <-fwm.wantReqs

82
	if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
83 84 85 86 87 88 89
		t.Fatal("did not enqueue correct initial number of wants")
	}
	if receivedWantReq.peers != nil {
		t.Fatal("first want request should be a broadcast")
	}

	// now receive the first set of blocks
90
	peers := testutil.GeneratePeers(broadcastLiveWantsLimit)
91 92 93
	var newCancelReqs []wantReq
	var newBlockReqs []wantReq
	var receivedBlocks []blocks.Block
hannahhoward's avatar
hannahhoward committed
94
	for i, p := range peers {
95 96 97 98 99
		session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, receivedWantReq.cids[i])])
		receivedBlock := <-getBlocksCh
		receivedBlocks = append(receivedBlocks, receivedBlock)
		cancelBlock := <-cancelReqs
		newCancelReqs = append(newCancelReqs, cancelBlock)
100 101 102 103 104
		select {
		case wantBlock := <-wantReqs:
			newBlockReqs = append(newBlockReqs, wantBlock)
		default:
		}
hannahhoward's avatar
hannahhoward committed
105 106 107
	}

	// verify new peers were recorded
108
	fpm.lk.Lock()
109
	if len(fpm.peers) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
110 111 112 113 114 115 116
		t.Fatal("received blocks not recorded by the peer manager")
	}
	for _, p := range fpm.peers {
		if !testutil.ContainsPeer(peers, p) {
			t.Fatal("incorrect peer recorded to peer manager")
		}
	}
117
	fpm.lk.Unlock()
hannahhoward's avatar
hannahhoward committed
118 119 120 121

	// look at new interactions with want manager

	// should have cancelled each received block
122
	if len(newCancelReqs) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
123 124 125
		t.Fatal("did not cancel each block once it was received")
	}
	// new session reqs should be targeted
126
	var newCidsRequested []cid.Cid
hannahhoward's avatar
hannahhoward committed
127 128 129 130
	for _, w := range newBlockReqs {
		if len(w.peers) == 0 {
			t.Fatal("should not have broadcast again after initial broadcast")
		}
131
		newCidsRequested = append(newCidsRequested, w.cids...)
hannahhoward's avatar
hannahhoward committed
132 133 134
	}

	// full new round of cids should be requested
135
	if len(newCidsRequested) != broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
136 137 138 139 140
		t.Fatal("new blocks were not requested")
	}

	// receive remaining blocks
	for i, p := range peers {
141
		session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newCidsRequested[i])])
142 143 144 145
		receivedBlock := <-getBlocksCh
		receivedBlocks = append(receivedBlocks, receivedBlock)
		cancelBlock := <-cancelReqs
		newCancelReqs = append(newCancelReqs, cancelBlock)
hannahhoward's avatar
hannahhoward committed
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
	}

	if len(receivedBlocks) != len(blks) {
		t.Fatal("did not receive enough blocks")
	}
	for _, block := range receivedBlocks {
		if !testutil.ContainsBlock(blks, block) {
			t.Fatal("received incorrect block")
		}
	}
}

func TestSessionFindMorePeers(t *testing.T) {

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
	defer cancel()
162 163 164
	wantReqs := make(chan wantReq, 1)
	cancelReqs := make(chan wantReq, 1)
	fwm := &fakeWantManager{wantReqs, cancelReqs}
165
	fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{})}
hannahhoward's avatar
hannahhoward committed
166 167
	id := testutil.GenerateSessionID()
	session := New(ctx, id, fwm, fpm)
168
	session.SetBaseTickDelay(200 * time.Microsecond)
hannahhoward's avatar
hannahhoward committed
169
	blockGenerator := blocksutil.NewBlockGenerator()
170
	blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
hannahhoward's avatar
hannahhoward committed
171 172 173 174 175 176 177 178 179
	var cids []cid.Cid
	for _, block := range blks {
		cids = append(cids, block.Cid())
	}
	getBlocksCh, err := session.GetBlocks(ctx, cids)
	if err != nil {
		t.Fatal("error getting blocks")
	}

180 181 182
	// clear the initial block of wants
	<-wantReqs

hannahhoward's avatar
hannahhoward committed
183
	// receive a block to trigger a tick reset
184
	time.Sleep(200 * time.Microsecond)
hannahhoward's avatar
hannahhoward committed
185 186
	p := testutil.GeneratePeers(1)[0]
	session.ReceiveBlockFrom(p, blks[0])
187 188 189
	<-getBlocksCh
	<-wantReqs
	<-cancelReqs
hannahhoward's avatar
hannahhoward committed
190

191
	// wait for a request to get more peers to occur
192
	<-fpm.findMorePeersRequested
hannahhoward's avatar
hannahhoward committed
193 194

	// verify a broadcast was made
195
	receivedWantReq := <-wantReqs
196
	if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
hannahhoward's avatar
hannahhoward committed
197 198 199 200 201
		t.Fatal("did not rebroadcast whole live list")
	}
	if receivedWantReq.peers != nil {
		t.Fatal("did not make a broadcast")
	}
202
	<-ctx.Done()
hannahhoward's avatar
hannahhoward committed
203
}