Unverified Commit 26fbfbf0 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #375 from ipfs/refactor/sess-ctx-cancels

Send CANCELs when session context is cancelled
parents 02942c30 2ac2ed62
......@@ -139,7 +139,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
pm := bspm.New(ctx, peerQueueFactory, network.Self())
pqm := bspqm.New(ctx, network)
sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager,
sessionFactory := func(sessctx context.Context, id uint64, spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
......@@ -147,7 +147,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
provSearchDelay time.Duration,
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
return bssession.New(ctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
return bssession.New(ctx, sessctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
......
......@@ -91,7 +91,8 @@ type op struct {
// info to, and who to request blocks from.
type Session struct {
// dependencies
ctx context.Context
bsctx context.Context // context for bitswap
ctx context.Context // context for session
pm PeerManager
bpm *bsbpm.BlockPresenceManager
sprm SessionPeerManager
......@@ -124,7 +125,9 @@ type Session struct {
// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context,
func New(
bsctx context.Context, // context for bitswap
ctx context.Context, // context for this session
id uint64,
sprm SessionPeerManager,
providerFinder ProviderFinder,
......@@ -138,6 +141,7 @@ func New(ctx context.Context,
s := &Session{
sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration),
bsctx: bsctx,
ctx: ctx,
pm: pm,
bpm: bpm,
......@@ -393,10 +397,11 @@ func (s *Session) handleShutdown() {
// in anymore
s.bpm.RemoveKeys(cancelKs)
// TODO: If the context is cancelled this won't actually send any CANCELs.
// We should use a longer lived context to send out these CANCELs.
// Send CANCEL to all peers for blocks that no session is interested in anymore
s.pm.SendCancels(s.ctx, cancelKs)
// Send CANCEL to all peers for blocks that no session is interested in
// anymore.
// Note: use bitswap context because session context has already been
// cancelled.
s.pm.SendCancels(s.bsctx, cancelKs)
}
// handleReceive is called when the session receives blocks from a peer
......
......@@ -103,7 +103,7 @@ func TestSessionGetBlocks(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
......@@ -198,7 +198,7 @@ func TestSessionFindMorePeers(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
......@@ -272,7 +272,7 @@ func TestSessionOnPeersExhausted(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5)
var cids []cid.Cid
......@@ -316,7 +316,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(4)
var cids []cid.Cid
......@@ -431,7 +431,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
// Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
session := New(sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(context.Background(), sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer timerCancel()
......@@ -472,7 +472,7 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(2)
cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment