sessionmanager_test.go 6.93 KB
Newer Older
1 2 3 4 5 6 7
package sessionmanager

import (
	"context"
	"testing"
	"time"

8
	delay "github.com/ipfs/go-ipfs-delay"
9

10
	bssession "github.com/ipfs/go-bitswap/session"
11
	bssd "github.com/ipfs/go-bitswap/sessiondata"
12
	"github.com/ipfs/go-bitswap/testutil"
13 14 15

	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
16
	peer "github.com/libp2p/go-libp2p-core/peer"
17 18 19
)

type fakeSession struct {
20 21 22
	interested            []cid.Cid
	blks                  []blocks.Block
	fromNetwork           bool
23 24 25 26
	receivedBlock         bool
	updateReceiveCounters bool
	id                    uint64
	pm                    *fakePeerManager
27
	srs                   *fakeRequestSplitter
28 29 30 31 32 33 34 35
}

func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
	return nil, nil
}
func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
	return nil, nil
}
36 37 38 39 40 41 42 43 44 45 46
func (fs *fakeSession) InterestedIn(c cid.Cid) bool {
	for _, ic := range fs.interested {
		if c == ic {
			return true
		}
	}
	return false
}
func (fs *fakeSession) ReceiveBlocksFrom(p peer.ID, blks []blocks.Block) {
	fs.blks = append(fs.blks, blks...)
}
47 48 49 50 51 52

type fakePeerManager struct {
	id uint64
}

func (*fakePeerManager) FindMorePeers(context.Context, cid.Cid)  {}
53
func (*fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer { return nil }
54
func (*fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
55 56
func (*fakePeerManager) RecordPeerResponse(peer.ID, []cid.Cid)   {}
func (*fakePeerManager) RecordCancels(c []cid.Cid)               {}
57

58 59 60
type fakeRequestSplitter struct {
}

61
func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, keys []cid.Cid) []bssd.PartialRequest {
62 63 64 65 66
	return nil
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock()    {}

67
var nextInterestedIn []cid.Cid
68

69 70 71 72 73 74
func sessionFactory(ctx context.Context,
	id uint64,
	pm bssession.PeerManager,
	srs bssession.RequestSplitter,
	provSearchDelay time.Duration,
	rebroadcastDelay delay.D) Session {
75
	return &fakeSession{
76 77 78 79
		interested: nextInterestedIn,
		id:         id,
		pm:         pm.(*fakePeerManager),
		srs:        srs.(*fakeRequestSplitter),
80 81 82 83 84 85 86
	}
}

func peerManagerFactory(ctx context.Context, id uint64) bssession.PeerManager {
	return &fakePeerManager{id}
}

87 88 89 90
func requestSplitterFactory(ctx context.Context) bssession.RequestSplitter {
	return &fakeRequestSplitter{}
}

91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
func cmpSessionCids(s *fakeSession, cids []cid.Cid) bool {
	return cmpBlockCids(s.blks, cids)
}

func cmpBlockCids(blks []blocks.Block, cids []cid.Cid) bool {
	if len(blks) != len(cids) {
		return false
	}
	for _, b := range blks {
		has := false
		for _, c := range cids {
			if c == b.Cid() {
				has = true
			}
		}
		if !has {
			return false
		}
	}
	return true
}

113 114 115 116
func TestAddingSessions(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
117
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
118 119 120 121

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
122
	nextInterestedIn = []cid.Cid{block.Cid()}
123 124

	currentID := sm.GetNextSessionID()
125
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
126 127 128 129
	if firstSession.id != firstSession.pm.id ||
		firstSession.id != currentID+1 {
		t.Fatal("session does not have correct id set")
	}
130
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
131 132 133 134 135
	if secondSession.id != secondSession.pm.id ||
		secondSession.id != firstSession.id+1 {
		t.Fatal("session does not have correct id set")
	}
	sm.GetNextSessionID()
136
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
137 138 139 140
	if thirdSession.id != thirdSession.pm.id ||
		thirdSession.id != secondSession.id+2 {
		t.Fatal("session does not have correct id set")
	}
141 142 143 144
	sm.ReceiveBlocksFrom(p, []blocks.Block{block})
	if len(firstSession.blks) == 0 ||
		len(secondSession.blks) == 0 ||
		len(thirdSession.blks) == 0 {
145 146 147 148 149 150 151 152
		t.Fatal("should have received blocks but didn't")
	}
}

func TestReceivingBlocksWhenNotInterested(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
153
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
154 155

	p := peer.ID(123)
156 157 158 159 160 161 162
	blks := testutil.GenerateBlocksOfSize(3, 1024)
	var cids []cid.Cid
	for _, b := range blks {
		cids = append(cids, b.Cid())
	}

	nextInterestedIn = []cid.Cid{cids[0], cids[1]}
163
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
164
	nextInterestedIn = []cid.Cid{cids[0]}
165
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
166
	nextInterestedIn = []cid.Cid{}
167
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
168

169 170 171 172 173 174
	sm.ReceiveBlocksFrom(p, []blocks.Block{blks[0], blks[1]})

	if !cmpSessionCids(firstSession, []cid.Cid{cids[0], cids[1]}) ||
		!cmpSessionCids(secondSession, []cid.Cid{cids[0]}) ||
		!cmpSessionCids(thirdSession, []cid.Cid{}) {
		t.Fatal("did not receive correct blocks for sessions")
175 176 177 178 179 180
	}
}

func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
181
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
182 183 184 185

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
186
	nextInterestedIn = []cid.Cid{block.Cid()}
187 188 189
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
190 191 192 193

	cancel()
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
194 195 196 197
	sm.ReceiveBlocksFrom(p, []blocks.Block{block})
	if len(firstSession.blks) > 0 ||
		len(secondSession.blks) > 0 ||
		len(thirdSession.blks) > 0 {
198 199 200 201 202 203 204 205
		t.Fatal("received blocks for sessions after manager is shutdown")
	}
}

func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
206
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
207 208 209 210

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
211
	nextInterestedIn = []cid.Cid{block.Cid()}
212
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
213
	sessionCtx, sessionCancel := context.WithCancel(ctx)
214 215
	secondSession := sm.NewSession(sessionCtx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
216 217 218 219

	sessionCancel()
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
220 221 222 223
	sm.ReceiveBlocksFrom(p, []blocks.Block{block})
	if len(firstSession.blks) == 0 ||
		len(secondSession.blks) > 0 ||
		len(thirdSession.blks) == 0 {
224 225 226
		t.Fatal("received blocks for sessions that are canceled")
	}
}