sessionmanager_test.go 8.15 KB
Newer Older
1 2 3 4
package sessionmanager

import (
	"context"
5
	"fmt"
6
	"sync"
7 8 9
	"testing"
	"time"

10
	delay "github.com/ipfs/go-ipfs-delay"
11

12 13 14 15 16
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	notifications "github.com/ipfs/go-bitswap/internal/notifications"
	bspm "github.com/ipfs/go-bitswap/internal/peermanager"
	bssession "github.com/ipfs/go-bitswap/internal/session"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
17
	"github.com/ipfs/go-bitswap/internal/testutil"
18 19 20

	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
21
	peer "github.com/libp2p/go-libp2p-core/peer"
22 23 24
)

type fakeSession struct {
dirkmc's avatar
dirkmc committed
25 26 27 28 29
	ks         []cid.Cid
	wantBlocks []cid.Cid
	wantHaves  []cid.Cid
	id         uint64
	pm         *fakeSesPeerManager
30
	sm         bssession.SessionManager
dirkmc's avatar
dirkmc committed
31
	notif      notifications.PubSub
32 33 34 35 36 37 38 39
}

func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
	return nil, nil
}
func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
	return nil, nil
}
dirkmc's avatar
dirkmc committed
40 41
func (fs *fakeSession) ID() uint64 {
	return fs.id
42
}
dirkmc's avatar
dirkmc committed
43
func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
44
	fs.ks = append(fs.ks, ks...)
dirkmc's avatar
dirkmc committed
45 46
	fs.wantBlocks = append(fs.wantBlocks, wantBlocks...)
	fs.wantHaves = append(fs.wantHaves, wantHaves...)
47
}
48 49 50
func (fs *fakeSession) Shutdown() {
	fs.sm.RemoveSession(fs.id)
}
51

dirkmc's avatar
dirkmc committed
52
type fakeSesPeerManager struct {
53 54
}

55 56 57 58 59 60 61
func (*fakeSesPeerManager) Peers() []peer.ID          { return nil }
func (*fakeSesPeerManager) PeersDiscovered() bool     { return false }
func (*fakeSesPeerManager) Shutdown()                 {}
func (*fakeSesPeerManager) AddPeer(peer.ID) bool      { return false }
func (*fakeSesPeerManager) RemovePeer(peer.ID) bool   { return false }
func (*fakeSesPeerManager) HasPeers() bool            { return false }
func (*fakeSesPeerManager) ProtectConnection(peer.ID) {}
62

dirkmc's avatar
dirkmc committed
63
type fakePeerManager struct {
64
	lk      sync.Mutex
Dirk McCormick's avatar
Dirk McCormick committed
65
	cancels []cid.Cid
66 67
}

68
func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session)                    {}
dirkmc's avatar
dirkmc committed
69 70
func (*fakePeerManager) UnregisterSession(uint64)                                 {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}
Dirk McCormick's avatar
Dirk McCormick committed
71 72
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid)            {}
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
73 74
	fpm.lk.Lock()
	defer fpm.lk.Unlock()
Dirk McCormick's avatar
Dirk McCormick committed
75 76
	fpm.cancels = append(fpm.cancels, cancels...)
}
77 78 79 80 81
func (fpm *fakePeerManager) cancelled() []cid.Cid {
	fpm.lk.Lock()
	defer fpm.lk.Unlock()
	return fpm.cancels
}
82

83
func sessionFactory(ctx context.Context,
84
	sm bssession.SessionManager,
85
	id uint64,
dirkmc's avatar
dirkmc committed
86 87
	sprm bssession.SessionPeerManager,
	sim *bssim.SessionInterestManager,
88
	pm bssession.PeerManager,
dirkmc's avatar
dirkmc committed
89
	bpm *bsbpm.BlockPresenceManager,
90
	notif notifications.PubSub,
91
	provSearchDelay time.Duration,
dirkmc's avatar
dirkmc committed
92 93
	rebroadcastDelay delay.D,
	self peer.ID) Session {
94
	fs := &fakeSession{
dirkmc's avatar
dirkmc committed
95 96
		id:    id,
		pm:    sprm.(*fakeSesPeerManager),
97
		sm:    sm,
dirkmc's avatar
dirkmc committed
98
		notif: notif,
99
	}
100 101 102 103 104
	go func() {
		<-ctx.Done()
		sm.RemoveSession(fs.id)
	}()
	return fs
105 106
}

dirkmc's avatar
dirkmc committed
107 108
func peerManagerFactory(ctx context.Context, id uint64) bssession.SessionPeerManager {
	return &fakeSesPeerManager{}
109 110
}

dirkmc's avatar
dirkmc committed
111
func TestReceiveFrom(t *testing.T) {
112 113 114
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
115 116
	notif := notifications.New()
	defer notif.Shutdown()
dirkmc's avatar
dirkmc committed
117 118 119 120
	sim := bssim.New()
	bpm := bsbpm.New()
	pm := &fakePeerManager{}
	sm := New(ctx, sessionFactory, sim, peerManagerFactory, bpm, pm, notif, "")
121

122
	p := peer.ID(fmt.Sprint(123))
123 124
	block := blocks.NewBlock([]byte("block"))

125 126 127
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
dirkmc's avatar
dirkmc committed
128 129 130 131

	sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})

Dirk McCormick's avatar
Dirk McCormick committed
132
	sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
133
	if len(firstSession.ks) == 0 ||
dirkmc's avatar
dirkmc committed
134
		len(secondSession.ks) > 0 ||
135
		len(thirdSession.ks) == 0 {
136 137 138
		t.Fatal("should have received blocks but didn't")
	}

Dirk McCormick's avatar
Dirk McCormick committed
139
	sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{block.Cid()}, []cid.Cid{})
dirkmc's avatar
dirkmc committed
140 141 142 143
	if len(firstSession.wantBlocks) == 0 ||
		len(secondSession.wantBlocks) > 0 ||
		len(thirdSession.wantBlocks) == 0 {
		t.Fatal("should have received want-blocks but didn't")
144 145
	}

Dirk McCormick's avatar
Dirk McCormick committed
146
	sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{block.Cid()})
dirkmc's avatar
dirkmc committed
147 148 149 150
	if len(firstSession.wantHaves) == 0 ||
		len(secondSession.wantHaves) > 0 ||
		len(thirdSession.wantHaves) == 0 {
		t.Fatal("should have received want-haves but didn't")
151
	}
Dirk McCormick's avatar
Dirk McCormick committed
152

153
	if len(pm.cancelled()) != 1 {
Dirk McCormick's avatar
Dirk McCormick committed
154 155
		t.Fatal("should have sent cancel for received blocks")
	}
156 157
}

158
func TestReceiveBlocksWhenManagerShutdown(t *testing.T) {
159 160
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
dirkmc's avatar
dirkmc committed
161
	defer cancel()
162 163
	notif := notifications.New()
	defer notif.Shutdown()
dirkmc's avatar
dirkmc committed
164 165 166 167
	sim := bssim.New()
	bpm := bsbpm.New()
	pm := &fakePeerManager{}
	sm := New(ctx, sessionFactory, sim, peerManagerFactory, bpm, pm, notif, "")
168

169
	p := peer.ID(fmt.Sprint(123))
170
	block := blocks.NewBlock([]byte("block"))
dirkmc's avatar
dirkmc committed
171

172 173 174
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
175

dirkmc's avatar
dirkmc committed
176 177 178 179
	sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(secondSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})

180
	sm.Shutdown()
dirkmc's avatar
dirkmc committed
181

182 183
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
dirkmc's avatar
dirkmc committed
184

Dirk McCormick's avatar
Dirk McCormick committed
185
	sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
186 187 188
	if len(firstSession.ks) > 0 ||
		len(secondSession.ks) > 0 ||
		len(thirdSession.ks) > 0 {
189 190 191 192
		t.Fatal("received blocks for sessions after manager is shutdown")
	}
}

dirkmc's avatar
dirkmc committed
193
func TestReceiveBlocksWhenSessionContextCancelled(t *testing.T) {
194
	ctx, cancel := context.WithCancel(context.Background())
195
	defer cancel()
196 197
	notif := notifications.New()
	defer notif.Shutdown()
dirkmc's avatar
dirkmc committed
198 199 200 201
	sim := bssim.New()
	bpm := bsbpm.New()
	pm := &fakePeerManager{}
	sm := New(ctx, sessionFactory, sim, peerManagerFactory, bpm, pm, notif, "")
202

203
	p := peer.ID(fmt.Sprint(123))
204
	block := blocks.NewBlock([]byte("block"))
dirkmc's avatar
dirkmc committed
205

206
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
207
	sessionCtx, sessionCancel := context.WithCancel(ctx)
208 209
	secondSession := sm.NewSession(sessionCtx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
210

dirkmc's avatar
dirkmc committed
211 212 213 214
	sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(secondSession.ID(), []cid.Cid{block.Cid()})
	sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})

215
	sessionCancel()
dirkmc's avatar
dirkmc committed
216

217 218
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
dirkmc's avatar
dirkmc committed
219

Dirk McCormick's avatar
Dirk McCormick committed
220
	sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
221 222 223
	if len(firstSession.ks) == 0 ||
		len(secondSession.ks) > 0 ||
		len(thirdSession.ks) == 0 {
224 225 226
		t.Fatal("received blocks for sessions that are canceled")
	}
}
227 228 229 230 231 232 233 234 235 236 237 238

func TestShutdown(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	notif := notifications.New()
	defer notif.Shutdown()
	sim := bssim.New()
	bpm := bsbpm.New()
	pm := &fakePeerManager{}
	sm := New(ctx, sessionFactory, sim, peerManagerFactory, bpm, pm, notif, "")

239
	p := peer.ID(fmt.Sprint(123))
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
	block := blocks.NewBlock([]byte("block"))
	cids := []cid.Cid{block.Cid()}
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	sim.RecordSessionInterest(firstSession.ID(), cids)
	sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{}, cids)

	if !bpm.HasKey(block.Cid()) {
		t.Fatal("expected cid to be added to block presence manager")
	}

	sm.Shutdown()

	// wait for cleanup
	time.Sleep(10 * time.Millisecond)

	if bpm.HasKey(block.Cid()) {
		t.Fatal("expected cid to be removed from block presence manager")
	}
	if !testutil.MatchKeysIgnoreOrder(pm.cancelled(), cids) {
		t.Fatal("expected cancels to be sent")
	}
}