sessionmanager_test.go 7.97 KB
Newer Older
1 2 3 4 5 6 7
package sessionmanager

import (
	"context"
	"testing"
	"time"

8
	delay "github.com/ipfs/go-ipfs-delay"
9

10
	notifications "github.com/ipfs/go-bitswap/notifications"
11
	bssession "github.com/ipfs/go-bitswap/session"
12
	bssd "github.com/ipfs/go-bitswap/sessiondata"
13
	"github.com/ipfs/go-bitswap/testutil"
14 15 16

	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
17
	peer "github.com/libp2p/go-libp2p-core/peer"
18 19 20
)

type fakeSession struct {
Steven Allen's avatar
Steven Allen committed
21
	interested []cid.Cid
22
	ks         []cid.Cid
Steven Allen's avatar
Steven Allen committed
23 24 25
	id         uint64
	pm         *fakePeerManager
	srs        *fakeRequestSplitter
26
	notif      notifications.PubSub
27 28 29 30 31 32 33 34
}

func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
	return nil, nil
}
func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
	return nil, nil
}
35 36 37 38 39 40 41 42
func (fs *fakeSession) InterestedIn(c cid.Cid) bool {
	for _, ic := range fs.interested {
		if c == ic {
			return true
		}
	}
	return false
}
43 44 45
func (fs *fakeSession) IsWanted(c cid.Cid) bool {
	return fs.InterestedIn(c)
}
46
func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid) {
47
	fs.ks = append(fs.ks, ks...)
48
}
49 50 51 52 53 54

type fakePeerManager struct {
	id uint64
}

func (*fakePeerManager) FindMorePeers(context.Context, cid.Cid)  {}
55
func (*fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer { return nil }
56
func (*fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
57 58
func (*fakePeerManager) RecordPeerResponse(peer.ID, []cid.Cid)   {}
func (*fakePeerManager) RecordCancels(c []cid.Cid)               {}
59

60 61 62
type fakeRequestSplitter struct {
}

63
func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, keys []cid.Cid) []bssd.PartialRequest {
64 65 66 67 68
	return nil
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock()    {}

69
var nextInterestedIn []cid.Cid
70

71 72 73 74
func sessionFactory(ctx context.Context,
	id uint64,
	pm bssession.PeerManager,
	srs bssession.RequestSplitter,
75
	notif notifications.PubSub,
76 77
	provSearchDelay time.Duration,
	rebroadcastDelay delay.D) Session {
78
	return &fakeSession{
79 80 81 82
		interested: nextInterestedIn,
		id:         id,
		pm:         pm.(*fakePeerManager),
		srs:        srs.(*fakeRequestSplitter),
83
		notif:      notif,
84 85 86 87 88 89 90
	}
}

func peerManagerFactory(ctx context.Context, id uint64) bssession.PeerManager {
	return &fakePeerManager{id}
}

91 92 93 94
func requestSplitterFactory(ctx context.Context) bssession.RequestSplitter {
	return &fakeRequestSplitter{}
}

95
func cmpSessionCids(s *fakeSession, cids []cid.Cid) bool {
96
	if len(s.ks) != len(cids) {
97 98
		return false
	}
99
	for _, bk := range s.ks {
100 101
		has := false
		for _, c := range cids {
102
			if c == bk {
103 104 105 106 107 108 109 110 111 112
				has = true
			}
		}
		if !has {
			return false
		}
	}
	return true
}

113 114 115 116
func TestAddingSessions(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
117 118 119
	notif := notifications.New()
	defer notif.Shutdown()
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
120 121 122 123

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
124
	nextInterestedIn = []cid.Cid{block.Cid()}
125 126

	currentID := sm.GetNextSessionID()
127
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
128 129 130 131
	if firstSession.id != firstSession.pm.id ||
		firstSession.id != currentID+1 {
		t.Fatal("session does not have correct id set")
	}
132
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
133 134 135 136 137
	if secondSession.id != secondSession.pm.id ||
		secondSession.id != firstSession.id+1 {
		t.Fatal("session does not have correct id set")
	}
	sm.GetNextSessionID()
138
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
139 140 141 142
	if thirdSession.id != thirdSession.pm.id ||
		thirdSession.id != secondSession.id+2 {
		t.Fatal("session does not have correct id set")
	}
143
	sm.ReceiveFrom(p, []cid.Cid{block.Cid()})
144 145 146
	if len(firstSession.ks) == 0 ||
		len(secondSession.ks) == 0 ||
		len(thirdSession.ks) == 0 {
147 148 149 150 151 152 153 154
		t.Fatal("should have received blocks but didn't")
	}
}

func TestReceivingBlocksWhenNotInterested(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
155 156 157
	notif := notifications.New()
	defer notif.Shutdown()
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
158 159

	p := peer.ID(123)
160 161 162 163 164 165 166
	blks := testutil.GenerateBlocksOfSize(3, 1024)
	var cids []cid.Cid
	for _, b := range blks {
		cids = append(cids, b.Cid())
	}

	nextInterestedIn = []cid.Cid{cids[0], cids[1]}
167
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
168
	nextInterestedIn = []cid.Cid{cids[0]}
169
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
170
	nextInterestedIn = []cid.Cid{}
171
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
172

173
	sm.ReceiveFrom(p, []cid.Cid{blks[0].Cid(), blks[1].Cid()})
174 175 176 177 178

	if !cmpSessionCids(firstSession, []cid.Cid{cids[0], cids[1]}) ||
		!cmpSessionCids(secondSession, []cid.Cid{cids[0]}) ||
		!cmpSessionCids(thirdSession, []cid.Cid{}) {
		t.Fatal("did not receive correct blocks for sessions")
179 180 181
	}
}

182 183 184 185
func TestInterestedIn(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
186 187 188
	notif := notifications.New()
	defer notif.Shutdown()
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
189 190 191 192 193 194 195 196 197 198 199 200

	blks := testutil.GenerateBlocksOfSize(4, 1024)
	var cids []cid.Cid
	for _, b := range blks {
		cids = append(cids, b.Cid())
	}

	nextInterestedIn = []cid.Cid{cids[0], cids[1]}
	_ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	nextInterestedIn = []cid.Cid{cids[0], cids[2]}
	_ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)

201 202 203
	if !sm.IsWanted(cids[0]) ||
		!sm.IsWanted(cids[1]) ||
		!sm.IsWanted(cids[2]) {
204 205
		t.Fatal("expected interest but session manager was not interested")
	}
206
	if sm.IsWanted(cids[3]) {
207 208 209 210
		t.Fatal("expected no interest but session manager was interested")
	}
}

211 212 213
func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
214 215 216
	notif := notifications.New()
	defer notif.Shutdown()
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
217 218 219 220

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
221
	nextInterestedIn = []cid.Cid{block.Cid()}
222 223 224
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
225 226 227 228

	cancel()
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
229
	sm.ReceiveFrom(p, []cid.Cid{block.Cid()})
230 231 232
	if len(firstSession.ks) > 0 ||
		len(secondSession.ks) > 0 ||
		len(thirdSession.ks) > 0 {
233 234 235 236 237 238 239 240
		t.Fatal("received blocks for sessions after manager is shutdown")
	}
}

func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
241 242 243
	notif := notifications.New()
	defer notif.Shutdown()
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
244 245 246 247

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
248
	nextInterestedIn = []cid.Cid{block.Cid()}
249
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
250
	sessionCtx, sessionCancel := context.WithCancel(ctx)
251 252
	secondSession := sm.NewSession(sessionCtx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
253 254 255 256

	sessionCancel()
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
257
	sm.ReceiveFrom(p, []cid.Cid{block.Cid()})
258 259 260
	if len(firstSession.ks) == 0 ||
		len(secondSession.ks) > 0 ||
		len(thirdSession.ks) == 0 {
261 262 263
		t.Fatal("received blocks for sessions that are canceled")
	}
}