diff --git a/bitswap.go b/bitswap.go index f3320967f6a8cd31765ebb4edee94d2d1260e5da..db0ca0986de0cb7629dbc40129f5b045d37423d8 100644 --- a/bitswap.go +++ b/bitswap.go @@ -139,7 +139,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, pm := bspm.New(ctx, peerQueueFactory, network.Self()) pqm := bspqm.New(ctx, network) - sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager, + sessionFactory := func(sessctx context.Context, id uint64, spm bssession.SessionPeerManager, sim *bssim.SessionInterestManager, pm bssession.PeerManager, bpm *bsbpm.BlockPresenceManager, @@ -147,7 +147,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, provSearchDelay time.Duration, rebroadcastDelay delay.D, self peer.ID) bssm.Session { - return bssession.New(ctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) + return bssession.New(ctx, sessctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) } sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager { return bsspm.New(id, network.ConnectionManager()) diff --git a/internal/session/session.go b/internal/session/session.go index ef7798084d672dd13daee70cf08fc7fa13174557..11c8b09249ec585fc57bbad87f72ed246f2e366f 100644 --- a/internal/session/session.go +++ b/internal/session/session.go @@ -91,7 +91,8 @@ type op struct { // info to, and who to request blocks from. type Session struct { // dependencies - ctx context.Context + bsctx context.Context // context for bitswap + ctx context.Context // context for session pm PeerManager bpm *bsbpm.BlockPresenceManager sprm SessionPeerManager @@ -124,7 +125,9 @@ type Session struct { // New creates a new bitswap session whose lifetime is bounded by the // given context. -func New(ctx context.Context, +func New( + bsctx context.Context, // context for bitswap + ctx context.Context, // context for this session id uint64, sprm SessionPeerManager, providerFinder ProviderFinder, @@ -138,6 +141,7 @@ func New(ctx context.Context, s := &Session{ sw: newSessionWants(broadcastLiveWantsLimit), tickDelayReqs: make(chan time.Duration), + bsctx: bsctx, ctx: ctx, pm: pm, bpm: bpm, @@ -393,10 +397,11 @@ func (s *Session) handleShutdown() { // in anymore s.bpm.RemoveKeys(cancelKs) - // TODO: If the context is cancelled this won't actually send any CANCELs. - // We should use a longer lived context to send out these CANCELs. - // Send CANCEL to all peers for blocks that no session is interested in anymore - s.pm.SendCancels(s.ctx, cancelKs) + // Send CANCEL to all peers for blocks that no session is interested in + // anymore. + // Note: use bitswap context because session context has already been + // cancelled. + s.pm.SendCancels(s.bsctx, cancelKs) } // handleReceive is called when the session receives blocks from a peer diff --git a/internal/session/session_test.go b/internal/session/session_test.go index 194a1ec9688ae91bc30ee900092a2dd85653aaa2..79010db1fd4637dd4d21401d7d541149cf078b11 100644 --- a/internal/session/session_test.go +++ b/internal/session/session_test.go @@ -103,7 +103,7 @@ func TestSessionGetBlocks(t *testing.T) { notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2) var cids []cid.Cid @@ -198,7 +198,7 @@ func TestSessionFindMorePeers(t *testing.T) { notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") session.SetBaseTickDelay(200 * time.Microsecond) blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2) @@ -272,7 +272,7 @@ func TestSessionOnPeersExhausted(t *testing.T) { notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5) var cids []cid.Cid @@ -316,7 +316,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "") + session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "") blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(4) var cids []cid.Cid @@ -431,7 +431,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { // Create a new session with its own context sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - session := New(sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(context.Background(), sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer timerCancel() @@ -472,7 +472,7 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) { notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(2) cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}