sessionmanager_test.go 8.11 KB
Newer Older
1 2 3 4
package sessionmanager

import (
	"context"
5
	"sync"
6 7 8
	"testing"
	"time"

9
	delay "github.com/ipfs/go-ipfs-delay"
10

11 12 13 14 15
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	notifications "github.com/ipfs/go-bitswap/internal/notifications"
	bspm "github.com/ipfs/go-bitswap/internal/peermanager"
	bssession "github.com/ipfs/go-bitswap/internal/session"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
16
	"github.com/ipfs/go-bitswap/internal/testutil"
17 18 19

	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
20
	peer "github.com/libp2p/go-libp2p-core/peer"
21 22 23
)

type fakeSession struct {
dirkmc's avatar
dirkmc committed
24 25 26 27 28
	ks         []cid.Cid
	wantBlocks []cid.Cid
	wantHaves  []cid.Cid
	id         uint64
	pm         *fakeSesPeerManager
29
	sm         bssession.SessionManager
dirkmc's avatar
dirkmc committed
30
	notif      notifications.PubSub
31 32 33 34 35 36 37 38
}

func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
	return nil, nil
}
func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
	return nil, nil
}
dirkmc's avatar
dirkmc committed
39 40
func (fs *fakeSession) ID() uint64 {
	return fs.id
41
}
dirkmc's avatar
dirkmc committed
42
func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
43
	fs.ks = append(fs.ks, ks...)
dirkmc's avatar
dirkmc committed
44 45
	fs.wantBlocks = append(fs.wantBlocks, wantBlocks...)
	fs.wantHaves = append(fs.wantHaves, wantHaves...)
46
}
47 48 49
func (fs *fakeSession) Shutdown() {
	fs.sm.RemoveSession(fs.id)
}
50

dirkmc's avatar
dirkmc committed
51
type fakeSesPeerManager struct {
52 53
}

54 55 56 57 58 59 60
func (*fakeSesPeerManager) Peers() []peer.ID          { return nil }
func (*fakeSesPeerManager) PeersDiscovered() bool     { return false }
func (*fakeSesPeerManager) Shutdown()                 {}
func (*fakeSesPeerManager) AddPeer(peer.ID) bool      { return false }
func (*fakeSesPeerManager) RemovePeer(peer.ID) bool   { return false }
func (*fakeSesPeerManager) HasPeers() bool            { return false }
func (*fakeSesPeerManager) ProtectConnection(peer.ID) {}
61

dirkmc's avatar
dirkmc committed
62
type fakePeerManager struct {
63
	lk      sync.Mutex
Dirk McCormick's avatar
Dirk McCormick committed
64
	cancels []cid.Cid
65 66
}

dirkmc's avatar
dirkmc committed
67 68 69
func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool               { return true }
func (*fakePeerManager) UnregisterSession(uint64)                                 {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}
Dirk McCormick's avatar
Dirk McCormick committed
70 71
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid)            {}
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
72 73
	fpm.lk.Lock()
	defer fpm.lk.Unlock()
Dirk McCormick's avatar
Dirk McCormick committed
74 75
	fpm.cancels = append(fpm.cancels, cancels...)
}
76 77 78 79 80
func (fpm *fakePeerManager) cancelled() []cid.Cid {
	fpm.lk.Lock()
	defer fpm.lk.Unlock()
	return fpm.cancels
}
81

82
func sessionFactory(ctx context.Context,
83
	sm bssession.SessionManager,
84
	id uint64,
dirkmc's avatar
dirkmc committed
85 86
	sprm bssession.SessionPeerManager,
	sim *bssim.SessionInterestManager,
87
	pm bssession.PeerManager,
dirkmc's avatar
dirkmc committed
88
	bpm *bsbpm.BlockPresenceManager,
89
	notif notifications.PubSub,
90
	provSearchDelay time.Duration,
dirkmc's avatar
dirkmc committed
91 92
	rebroadcastDelay delay.D,
	self peer.ID) Session {
93
	fs := &fakeSession{
dirkmc's avatar
dirkmc committed
94 95
		id:    id,
		pm:    sprm.(*fakeSesPeerManager),
96
		sm:    sm,
dirkmc's avatar
dirkmc committed
97
		notif: notif,
98
	}
99 100 101 102 103
	go func() {
		<-ctx.Done()
		sm.RemoveSession(fs.id)
	}()
	return fs
104 105
}

dirkmc's avatar
dirkmc committed
106 107
func peerManagerFactory(ctx context.Context, id uint64) bssession.SessionPeerManager {
	return &fakeSesPeerManager{}
108 109
}

dirkmc's avatar
dirkmc committed
110
func TestReceiveFrom(t *testing.T) {
111 112 113
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
114 115
	notif := notifications.New()
	defer notif.Shutdown()
dirkmc's avatar
dirkmc committed
116 117 118 119
	sim := bssim.New()
	bpm := bsbpm.New()
	pm := &fakePeerManager{}
	sm := New(ctx, sessionFactory, sim, peerManagerFactory, bpm, pm, notif, "")
120 121 122 123

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))

124 125 126
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
dirkmc's avatar
dirkmc committed
127 128 129 130

	sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})

Dirk McCormick's avatar
Dirk McCormick committed
131
	sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
132
	if len(firstSession.ks) == 0 ||
dirkmc's avatar
dirkmc committed
133
		len(secondSession.ks) > 0 ||
134
		len(thirdSession.ks) == 0 {
135 136 137
		t.Fatal("should have received blocks but didn't")
	}

Dirk McCormick's avatar
Dirk McCormick committed
138
	sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{block.Cid()}, []cid.Cid{})
dirkmc's avatar
dirkmc committed
139 140 141 142
	if len(firstSession.wantBlocks) == 0 ||
		len(secondSession.wantBlocks) > 0 ||
		len(thirdSession.wantBlocks) == 0 {
		t.Fatal("should have received want-blocks but didn't")
143 144
	}

Dirk McCormick's avatar
Dirk McCormick committed
145
	sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{block.Cid()})
dirkmc's avatar
dirkmc committed
146 147 148 149
	if len(firstSession.wantHaves) == 0 ||
		len(secondSession.wantHaves) > 0 ||
		len(thirdSession.wantHaves) == 0 {
		t.Fatal("should have received want-haves but didn't")
150
	}
Dirk McCormick's avatar
Dirk McCormick committed
151

152
	if len(pm.cancelled()) != 1 {
Dirk McCormick's avatar
Dirk McCormick committed
153 154
		t.Fatal("should have sent cancel for received blocks")
	}
155 156
}

157
func TestReceiveBlocksWhenManagerShutdown(t *testing.T) {
158 159
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
dirkmc's avatar
dirkmc committed
160
	defer cancel()
161 162
	notif := notifications.New()
	defer notif.Shutdown()
dirkmc's avatar
dirkmc committed
163 164 165 166
	sim := bssim.New()
	bpm := bsbpm.New()
	pm := &fakePeerManager{}
	sm := New(ctx, sessionFactory, sim, peerManagerFactory, bpm, pm, notif, "")
167 168 169

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
dirkmc's avatar
dirkmc committed
170

171 172 173
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
174

dirkmc's avatar
dirkmc committed
175 176 177 178
	sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(secondSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})

179
	sm.Shutdown()
dirkmc's avatar
dirkmc committed
180

181 182
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
dirkmc's avatar
dirkmc committed
183

Dirk McCormick's avatar
Dirk McCormick committed
184
	sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
185 186 187
	if len(firstSession.ks) > 0 ||
		len(secondSession.ks) > 0 ||
		len(thirdSession.ks) > 0 {
188 189 190 191
		t.Fatal("received blocks for sessions after manager is shutdown")
	}
}

dirkmc's avatar
dirkmc committed
192
func TestReceiveBlocksWhenSessionContextCancelled(t *testing.T) {
193
	ctx, cancel := context.WithCancel(context.Background())
194
	defer cancel()
195 196
	notif := notifications.New()
	defer notif.Shutdown()
dirkmc's avatar
dirkmc committed
197 198 199 200
	sim := bssim.New()
	bpm := bsbpm.New()
	pm := &fakePeerManager{}
	sm := New(ctx, sessionFactory, sim, peerManagerFactory, bpm, pm, notif, "")
201 202 203

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
dirkmc's avatar
dirkmc committed
204

205
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
206
	sessionCtx, sessionCancel := context.WithCancel(ctx)
207 208
	secondSession := sm.NewSession(sessionCtx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
209

dirkmc's avatar
dirkmc committed
210 211 212 213
	sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(secondSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})

214
	sessionCancel()
dirkmc's avatar
dirkmc committed
215

216 217
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
dirkmc's avatar
dirkmc committed
218

Dirk McCormick's avatar
Dirk McCormick committed
219
	sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
220 221 222
	if len(firstSession.ks) == 0 ||
		len(secondSession.ks) > 0 ||
		len(thirdSession.ks) == 0 {
223 224 225
		t.Fatal("received blocks for sessions that are canceled")
	}
}
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260

func TestShutdown(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	notif := notifications.New()
	defer notif.Shutdown()
	sim := bssim.New()
	bpm := bsbpm.New()
	pm := &fakePeerManager{}
	sm := New(ctx, sessionFactory, sim, peerManagerFactory, bpm, pm, notif, "")

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	cids := []cid.Cid{block.Cid()}
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	sim.RecordSessionInterest(firstSession.ID(), cids)
	sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{}, cids)

	if !bpm.HasKey(block.Cid()) {
		t.Fatal("expected cid to be added to block presence manager")
	}

	sm.Shutdown()

	// wait for cleanup
	time.Sleep(10 * time.Millisecond)

	if bpm.HasKey(block.Cid()) {
		t.Fatal("expected cid to be removed from block presence manager")
	}
	if !testutil.MatchKeysIgnoreOrder(pm.cancelled(), cids) {
		t.Fatal("expected cancels to be sent")
	}
}