sessionmanager_test.go 6.66 KB
Newer Older
1 2 3 4 5 6 7
package sessionmanager

import (
	"context"
	"testing"
	"time"

8
	delay "github.com/ipfs/go-ipfs-delay"
9

10 11 12 13 14
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	notifications "github.com/ipfs/go-bitswap/internal/notifications"
	bspm "github.com/ipfs/go-bitswap/internal/peermanager"
	bssession "github.com/ipfs/go-bitswap/internal/session"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
15 16 17

	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
18
	peer "github.com/libp2p/go-libp2p-core/peer"
19 20 21
)

type fakeSession struct {
dirkmc's avatar
dirkmc committed
22 23 24 25 26 27
	ks         []cid.Cid
	wantBlocks []cid.Cid
	wantHaves  []cid.Cid
	id         uint64
	pm         *fakeSesPeerManager
	notif      notifications.PubSub
28 29 30 31 32 33 34 35
}

func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
	return nil, nil
}
func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
	return nil, nil
}
dirkmc's avatar
dirkmc committed
36 37
func (fs *fakeSession) ID() uint64 {
	return fs.id
38
}
dirkmc's avatar
dirkmc committed
39
func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
40
	fs.ks = append(fs.ks, ks...)
dirkmc's avatar
dirkmc committed
41 42
	fs.wantBlocks = append(fs.wantBlocks, wantBlocks...)
	fs.wantHaves = append(fs.wantHaves, wantHaves...)
43
}
44

dirkmc's avatar
dirkmc committed
45
type fakeSesPeerManager struct {
46 47
}

48 49 50 51 52 53
func (*fakeSesPeerManager) Peers() []peer.ID        { return nil }
func (*fakeSesPeerManager) PeersDiscovered() bool   { return false }
func (*fakeSesPeerManager) Shutdown()               {}
func (*fakeSesPeerManager) AddPeer(peer.ID) bool    { return false }
func (*fakeSesPeerManager) RemovePeer(peer.ID) bool { return false }
func (*fakeSesPeerManager) HasPeers() bool          { return false }
54

dirkmc's avatar
dirkmc committed
55
type fakePeerManager struct {
Dirk McCormick's avatar
Dirk McCormick committed
56
	cancels []cid.Cid
57 58
}

dirkmc's avatar
dirkmc committed
59 60 61
func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool               { return true }
func (*fakePeerManager) UnregisterSession(uint64)                                 {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}
Dirk McCormick's avatar
Dirk McCormick committed
62 63 64 65
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid)            {}
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
	fpm.cancels = append(fpm.cancels, cancels...)
}
66

67 68
func sessionFactory(ctx context.Context,
	id uint64,
dirkmc's avatar
dirkmc committed
69 70
	sprm bssession.SessionPeerManager,
	sim *bssim.SessionInterestManager,
71
	pm bssession.PeerManager,
dirkmc's avatar
dirkmc committed
72
	bpm *bsbpm.BlockPresenceManager,
73
	notif notifications.PubSub,
74
	provSearchDelay time.Duration,
dirkmc's avatar
dirkmc committed
75 76
	rebroadcastDelay delay.D,
	self peer.ID) Session {
77
	return &fakeSession{
dirkmc's avatar
dirkmc committed
78 79 80
		id:    id,
		pm:    sprm.(*fakeSesPeerManager),
		notif: notif,
81 82 83
	}
}

dirkmc's avatar
dirkmc committed
84 85
func peerManagerFactory(ctx context.Context, id uint64) bssession.SessionPeerManager {
	return &fakeSesPeerManager{}
86 87
}

dirkmc's avatar
dirkmc committed
88
func TestReceiveFrom(t *testing.T) {
89 90 91
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
92 93
	notif := notifications.New()
	defer notif.Shutdown()
dirkmc's avatar
dirkmc committed
94 95 96 97
	sim := bssim.New()
	bpm := bsbpm.New()
	pm := &fakePeerManager{}
	sm := New(ctx, sessionFactory, sim, peerManagerFactory, bpm, pm, notif, "")
98 99 100 101

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))

102 103 104
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
dirkmc's avatar
dirkmc committed
105 106 107 108

	sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})

Dirk McCormick's avatar
Dirk McCormick committed
109
	sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
110
	if len(firstSession.ks) == 0 ||
dirkmc's avatar
dirkmc committed
111
		len(secondSession.ks) > 0 ||
112
		len(thirdSession.ks) == 0 {
113 114 115
		t.Fatal("should have received blocks but didn't")
	}

Dirk McCormick's avatar
Dirk McCormick committed
116
	sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{block.Cid()}, []cid.Cid{})
dirkmc's avatar
dirkmc committed
117 118 119 120
	if len(firstSession.wantBlocks) == 0 ||
		len(secondSession.wantBlocks) > 0 ||
		len(thirdSession.wantBlocks) == 0 {
		t.Fatal("should have received want-blocks but didn't")
121 122
	}

Dirk McCormick's avatar
Dirk McCormick committed
123
	sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{block.Cid()})
dirkmc's avatar
dirkmc committed
124 125 126 127
	if len(firstSession.wantHaves) == 0 ||
		len(secondSession.wantHaves) > 0 ||
		len(thirdSession.wantHaves) == 0 {
		t.Fatal("should have received want-haves but didn't")
128
	}
Dirk McCormick's avatar
Dirk McCormick committed
129 130 131 132

	if len(pm.cancels) != 1 {
		t.Fatal("should have sent cancel for received blocks")
	}
133 134
}

dirkmc's avatar
dirkmc committed
135
func TestReceiveBlocksWhenManagerContextCancelled(t *testing.T) {
136 137
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
dirkmc's avatar
dirkmc committed
138
	defer cancel()
139 140
	notif := notifications.New()
	defer notif.Shutdown()
dirkmc's avatar
dirkmc committed
141 142 143 144
	sim := bssim.New()
	bpm := bsbpm.New()
	pm := &fakePeerManager{}
	sm := New(ctx, sessionFactory, sim, peerManagerFactory, bpm, pm, notif, "")
145 146 147

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
dirkmc's avatar
dirkmc committed
148

149 150 151
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
152

dirkmc's avatar
dirkmc committed
153 154 155 156
	sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(secondSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})

157
	cancel()
dirkmc's avatar
dirkmc committed
158

159 160
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
dirkmc's avatar
dirkmc committed
161

Dirk McCormick's avatar
Dirk McCormick committed
162
	sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
163 164 165
	if len(firstSession.ks) > 0 ||
		len(secondSession.ks) > 0 ||
		len(thirdSession.ks) > 0 {
166 167 168 169
		t.Fatal("received blocks for sessions after manager is shutdown")
	}
}

dirkmc's avatar
dirkmc committed
170
func TestReceiveBlocksWhenSessionContextCancelled(t *testing.T) {
171 172 173
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
174 175
	notif := notifications.New()
	defer notif.Shutdown()
dirkmc's avatar
dirkmc committed
176 177 178 179
	sim := bssim.New()
	bpm := bsbpm.New()
	pm := &fakePeerManager{}
	sm := New(ctx, sessionFactory, sim, peerManagerFactory, bpm, pm, notif, "")
180 181 182

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
dirkmc's avatar
dirkmc committed
183

184
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
185
	sessionCtx, sessionCancel := context.WithCancel(ctx)
186 187
	secondSession := sm.NewSession(sessionCtx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
188

dirkmc's avatar
dirkmc committed
189 190 191 192
	sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(secondSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})

193
	sessionCancel()
dirkmc's avatar
dirkmc committed
194

195 196
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
dirkmc's avatar
dirkmc committed
197

Dirk McCormick's avatar
Dirk McCormick committed
198
	sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
199 200 201
	if len(firstSession.ks) == 0 ||
		len(secondSession.ks) > 0 ||
		len(thirdSession.ks) == 0 {
202 203 204
		t.Fatal("received blocks for sessions that are canceled")
	}
}