diff --git a/bitswap.go b/bitswap.go index aab1429fa247899ad78b5209339851b843c103a3..db0ca0986de0cb7629dbc40129f5b045d37423d8 100644 --- a/bitswap.go +++ b/bitswap.go @@ -22,7 +22,6 @@ import ( bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager" bssm "github.com/ipfs/go-bitswap/internal/sessionmanager" bsspm "github.com/ipfs/go-bitswap/internal/sessionpeermanager" - bswm "github.com/ipfs/go-bitswap/internal/wantmanager" bsmsg "github.com/ipfs/go-bitswap/message" bsnet "github.com/ipfs/go-bitswap/network" blocks "github.com/ipfs/go-block-format" @@ -123,13 +122,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, return nil }) - var wm *bswm.WantManager // onDontHaveTimeout is called when a want-block is sent to a peer that // has an old version of Bitswap that doesn't support DONT_HAVE messages, // or when no response is received within a timeout. + var sm *bssm.SessionManager onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) { - // Simulate a DONT_HAVE message arriving to the WantManager - wm.ReceiveFrom(ctx, p, nil, nil, dontHaves) + // Simulate a message arriving with DONT_HAVEs + sm.ReceiveFrom(ctx, p, nil, nil, dontHaves) } peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue { return bsmq.New(ctx, p, network, onDontHaveTimeout) @@ -138,10 +137,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, sim := bssim.New() bpm := bsbpm.New() pm := bspm.New(ctx, peerQueueFactory, network.Self()) - wm = bswm.New(ctx, pm, sim, bpm) pqm := bspqm.New(ctx, network) - sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager, + sessionFactory := func(sessctx context.Context, id uint64, spm bssession.SessionPeerManager, sim *bssim.SessionInterestManager, pm bssession.PeerManager, bpm *bsbpm.BlockPresenceManager, @@ -149,14 +147,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, provSearchDelay time.Duration, rebroadcastDelay delay.D, self peer.ID) bssm.Session { - return bssession.New(ctx, id, wm, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) + return bssession.New(ctx, sessctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) } sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager { return bsspm.New(id, network.ConnectionManager()) } notif := notifications.New() - sm := bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self()) - wm.SetSessionManager(sm) + sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self()) engine := decision.NewEngine(ctx, bstore, network.ConnectionManager(), network.Self()) bs := &Bitswap{ @@ -166,7 +163,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, process: px, newBlocks: make(chan cid.Cid, HasBlockBufferSize), provideKeys: make(chan cid.Cid, provideKeysBufferSize), - wm: wm, pm: pm, pqm: pqm, sm: sm, @@ -207,9 +203,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, // Bitswap instances implement the bitswap protocol. type Bitswap struct { - // the wantlist tracks global wants for bitswap - wm *bswm.WantManager - pm *bspm.PeerManager // the provider query manager manages requests to find providers @@ -357,7 +350,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b // Send all block keys (including duplicates) to any sessions that want them. // (The duplicates are needed by sessions for accounting purposes) - bs.wm.ReceiveFrom(ctx, from, allKs, haves, dontHaves) + bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves) // Send wanted blocks to decision engine bs.engine.ReceiveFrom(from, wanted, haves) @@ -480,14 +473,14 @@ func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool { // PeerConnected is called by the network interface // when a peer initiates a new connection to bitswap. func (bs *Bitswap) PeerConnected(p peer.ID) { - bs.wm.Connected(p) + bs.pm.Connected(p) bs.engine.PeerConnected(p) } // PeerDisconnected is called by the network interface when a peer // closes a connection func (bs *Bitswap) PeerDisconnected(p peer.ID) { - bs.wm.Disconnected(p) + bs.pm.Disconnected(p) bs.engine.PeerDisconnected(p) } diff --git a/docs/go-bitswap.png b/docs/go-bitswap.png index 31dff2b85a71af71b056e0cdbaa12941d13dabf2..805bf6562a822c8cd68ed310ff49f07e4be45ba2 100644 Binary files a/docs/go-bitswap.png and b/docs/go-bitswap.png differ diff --git a/docs/go-bitswap.puml b/docs/go-bitswap.puml index 6a291dc3539d484b724b029ad0958bebbbf1e8a1..af9134d7e5a6e49e2fea51c4d919a8198ab6c83b 100644 --- a/docs/go-bitswap.puml +++ b/docs/go-bitswap.puml @@ -11,13 +11,6 @@ node "Sending Blocks" { [Engine] --> [TaskWorker (workers.go)] } -node "Requesting Blocks" { - [Bitswap] --* [WantManager] - [WantManager] --> [BlockPresenceManager] - [WantManager] --> [PeerManager] - [PeerManager] --* [MessageQueue] -} - node "Providing" { [Bitswap] --* [Provide Collector (workers.go)] [Provide Collector (workers.go)] --* [Provide Worker (workers.go)] @@ -31,14 +24,19 @@ node "Sessions (smart requests)" { [Bitswap] --* [SessionManager] [SessionManager] --> [SessionInterestManager] [SessionManager] --o [Session] + [SessionManager] --> [BlockPresenceManager] [Session] --* [sessionWantSender] [Session] --* [SessionPeerManager] - [Session] --> [WantManager] [Session] --> [ProvideQueryManager] [Session] --* [sessionWants] [Session] --> [SessionInterestManager] [sessionWantSender] --> [BlockPresenceManager] +} + +node "Requesting Blocks" { + [SessionManager] --> [PeerManager] [sessionWantSender] --> [PeerManager] + [PeerManager] --* [MessageQueue] } node "Network" { diff --git a/docs/how-bitswap-works.md b/docs/how-bitswap-works.md index 4b6ab1a74a63a5388a1c160995b1914b5a886775..303b05763ab757ff76b717bd0ca893956bd2f938 100644 --- a/docs/how-bitswap-works.md +++ b/docs/how-bitswap-works.md @@ -74,8 +74,8 @@ When a message is received, Bitswap So that the Engine can send responses to the wants - Informs the Engine of any received blocks So that the Engine can send the received blocks to any peers that want them -- Informs the WantManager of received blocks, HAVEs and DONT_HAVEs - So that the WantManager can inform interested sessions +- Informs the SessionManager of received blocks, HAVEs and DONT_HAVEs + So that the SessionManager can inform interested sessions When the client makes an API call, Bitswap creates a new Session and calls the corresponding method (eg `GetBlocks()`). @@ -101,9 +101,10 @@ The PeerTaskQueue prioritizes tasks such that the peers with the least amount of ### Requesting Blocks -When the WantManager is informed of a new message, it -- informs the SessionManager - The SessionManager informs the Sessions that are interested in the received blocks and wants +When the SessionManager is informed of a new message, it +- informs the BlockPresenceManager + The BlockPresenceManager keeps track of which peers have sent HAVES and DONT_HAVEs for each block +- informs the Sessions that are interested in the received blocks and wants - informs the PeerManager of received blocks The PeerManager checks if any wants were send to a peer for the received blocks. If so it sends a `CANCEL` message to those peers. @@ -114,7 +115,7 @@ The Session starts in "discovery" mode. This means it doesn't have any peers yet When the client initially requests blocks from a Session, the Session - informs the SessionInterestManager that it is interested in the want - informs the sessionWantManager of the want -- tells the WantManager to broadcast a `want-have` to all connected peers so as to discover which peers have the block +- tells the PeerManager to broadcast a `want-have` to all connected peers so as to discover which peers have the block - queries the ProviderQueryManager to discover which peers have the block When the session receives a message with `HAVE` or a `block`, it informs the SessionPeerManager. The SessionPeerManager keeps track of all peers in the session. diff --git a/internal/session/session.go b/internal/session/session.go index 34a7375c27dccdd017c23373d889f4fff00b23d9..11c8b09249ec585fc57bbad87f72ed246f2e366f 100644 --- a/internal/session/session.go +++ b/internal/session/session.go @@ -25,17 +25,6 @@ const ( broadcastLiveWantsLimit = 64 ) -// WantManager is an interface that can be used to request blocks -// from given peers. -type WantManager interface { - // BroadcastWantHaves sends want-haves to all connected peers (used for - // session discovery) - BroadcastWantHaves(context.Context, uint64, []cid.Cid) - // RemoveSession removes the session from the WantManager (when the - // session shuts down) - RemoveSession(context.Context, uint64) -} - // PeerManager keeps track of which sessions are interested in which peers // and takes care of sending wants for the sessions type PeerManager interface { @@ -47,6 +36,11 @@ type PeerManager interface { UnregisterSession(uint64) // SendWants tells the PeerManager to send wants to the given peer SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) + // BroadcastWantHaves sends want-haves to all connected peers (used for + // session discovery) + BroadcastWantHaves(context.Context, []cid.Cid) + // SendCancels tells the PeerManager to send cancels to all peers + SendCancels(context.Context, []cid.Cid) } // SessionPeerManager keeps track of peers in the session @@ -97,8 +91,10 @@ type op struct { // info to, and who to request blocks from. type Session struct { // dependencies - ctx context.Context - wm WantManager + bsctx context.Context // context for bitswap + ctx context.Context // context for session + pm PeerManager + bpm *bsbpm.BlockPresenceManager sprm SessionPeerManager providerFinder ProviderFinder sim *bssim.SessionInterestManager @@ -129,9 +125,10 @@ type Session struct { // New creates a new bitswap session whose lifetime is bounded by the // given context. -func New(ctx context.Context, +func New( + bsctx context.Context, // context for bitswap + ctx context.Context, // context for this session id uint64, - wm WantManager, sprm SessionPeerManager, providerFinder ProviderFinder, sim *bssim.SessionInterestManager, @@ -144,8 +141,10 @@ func New(ctx context.Context, s := &Session{ sw: newSessionWants(broadcastLiveWantsLimit), tickDelayReqs: make(chan time.Duration), + bsctx: bsctx, ctx: ctx, - wm: wm, + pm: pm, + bpm: bpm, sprm: sprm, providerFinder: providerFinder, sim: sim, @@ -301,13 +300,13 @@ func (s *Session) run(ctx context.Context) { s.sw.WantsSent(oper.keys) case opBroadcast: // Broadcast want-haves to all peers - s.broadcastWantHaves(ctx, oper.keys) + s.broadcast(ctx, oper.keys) default: panic("unhandled operation") } case <-s.idleTick.C: // The session hasn't received blocks for a while, broadcast - s.broadcastWantHaves(ctx, nil) + s.broadcast(ctx, nil) case <-s.periodicSearchTimer.C: // Periodically search for a random live want s.handlePeriodicSearch(ctx) @@ -325,7 +324,7 @@ func (s *Session) run(ctx context.Context) { // Called when the session hasn't received any blocks for some time, or when // all peers in the session have sent DONT_HAVE for a particular set of CIDs. // Send want-haves to all connected peers, and search for new peers with the CID. -func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { +func (s *Session) broadcast(ctx context.Context, wants []cid.Cid) { // If this broadcast is because of an idle timeout (we haven't received // any blocks for a while) then broadcast all pending wants if wants == nil { @@ -333,7 +332,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { } // Broadcast a want-have for the live wants to everyone we're connected to - s.wm.BroadcastWantHaves(ctx, s.id, wants) + s.broadcastWantHaves(ctx, wants) // do not find providers on consecutive ticks // -- just rely on periodic search widening @@ -341,7 +340,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { // Search for providers who have the first want in the list. // Typically if the provider has the first block they will have // the rest of the blocks also. - log.Debugf("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, wants[0], len(wants)) + log.Debugw("FindMorePeers", "session", s.id, "cid", wants[0], "pending", len(wants)) s.findMorePeers(ctx, wants[0]) } s.resetIdleTick() @@ -364,7 +363,7 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) { // for new providers for blocks. s.findMorePeers(ctx, randomWant) - s.wm.BroadcastWantHaves(ctx, s.id, []cid.Cid{randomWant}) + s.broadcastWantHaves(ctx, []cid.Cid{randomWant}) s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime()) } @@ -390,8 +389,19 @@ func (s *Session) handleShutdown() { // Shut down the sessionWantSender (blocks until sessionWantSender stops // sending) s.sws.Shutdown() - // Remove the session from the want manager - s.wm.RemoveSession(s.ctx, s.id) + + // Remove session's interest in the given blocks. + cancelKs := s.sim.RemoveSessionInterest(s.id) + + // Free up block presence tracking for keys that no session is interested + // in anymore + s.bpm.RemoveKeys(cancelKs) + + // Send CANCEL to all peers for blocks that no session is interested in + // anymore. + // Note: use bitswap context because session context has already been + // cancelled. + s.pm.SendCancels(s.bsctx, cancelKs) } // handleReceive is called when the session receives blocks from a peer @@ -439,11 +449,17 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) { // No peers discovered yet, broadcast some want-haves ks := s.sw.GetNextWants() if len(ks) > 0 { - log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks)) - s.wm.BroadcastWantHaves(ctx, s.id, ks) + log.Infow("No peers - broadcasting", "session", s.id, "want-count", len(ks)) + s.broadcastWantHaves(ctx, ks) } } +// Send want-haves to all connected peers +func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { + log.Debugw("broadcastWantHaves", "session", s.id, "cids", wants) + s.pm.BroadcastWantHaves(ctx, wants) +} + // The session will broadcast if it has outstanding wants and doesn't receive // any blocks for some time. // The length of time is calculated diff --git a/internal/session/session_test.go b/internal/session/session_test.go index d6f89e2dc05f6420781689b31c99e4a2590e5deb..79010db1fd4637dd4d21401d7d541149cf078b11 100644 --- a/internal/session/session_test.go +++ b/internal/session/session_test.go @@ -2,6 +2,7 @@ package session import ( "context" + "sync" "testing" "time" @@ -17,28 +18,6 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" ) -type wantReq struct { - cids []cid.Cid -} - -type fakeWantManager struct { - wantReqs chan wantReq -} - -func newFakeWantManager() *fakeWantManager { - return &fakeWantManager{ - wantReqs: make(chan wantReq, 1), - } -} - -func (fwm *fakeWantManager) BroadcastWantHaves(ctx context.Context, sesid uint64, cids []cid.Cid) { - select { - case fwm.wantReqs <- wantReq{cids}: - case <-ctx.Done(): - } -} -func (fwm *fakeWantManager) RemoveSession(context.Context, uint64) {} - func newFakeSessionPeerManager() *bsspm.SessionPeerManager { return bsspm.New(1, newFakePeerTagger()) } @@ -76,11 +55,20 @@ func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid return make(chan peer.ID) } +type wantReq struct { + cids []cid.Cid +} + type fakePeerManager struct { + wantReqs chan wantReq + lk sync.Mutex + cancels []cid.Cid } func newFakePeerManager() *fakePeerManager { - return &fakePeerManager{} + return &fakePeerManager{ + wantReqs: make(chan wantReq, 1), + } } func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool { @@ -88,19 +76,34 @@ func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool { } func (pm *fakePeerManager) UnregisterSession(uint64) {} func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {} +func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Cid) { + select { + case pm.wantReqs <- wantReq{cids}: + case <-ctx.Done(): + } +} +func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) { + pm.lk.Lock() + defer pm.lk.Unlock() + pm.cancels = append(pm.cancels, cancels...) +} +func (pm *fakePeerManager) allCancels() []cid.Cid { + pm.lk.Lock() + defer pm.lk.Unlock() + return append([]cid.Cid{}, pm.cancels...) +} func TestSessionGetBlocks(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - fwm := newFakeWantManager() - fpm := newFakeSessionPeerManager() + fpm := newFakePeerManager() + fspm := newFakeSessionPeerManager() fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2) var cids []cid.Cid @@ -115,7 +118,7 @@ func TestSessionGetBlocks(t *testing.T) { } // Wait for initial want request - receivedWantReq := <-fwm.wantReqs + receivedWantReq := <-fpm.wantReqs // Should have registered session's interest in blocks intSes := sim.FilterSessionInterested(id, cids) @@ -138,7 +141,7 @@ func TestSessionGetBlocks(t *testing.T) { time.Sleep(10 * time.Millisecond) // Verify new peers were recorded - if !testutil.MatchPeersIgnoreOrder(fpm.Peers(), peers) { + if !testutil.MatchPeersIgnoreOrder(fspm.Peers(), peers) { t.Fatal("peers not recorded by the peer manager") } @@ -172,20 +175,30 @@ func TestSessionGetBlocks(t *testing.T) { if len(wanted) != len(blks)-1 { t.Fatal("session wants incorrect number of blocks") } + + // Shut down session + cancel() + + time.Sleep(10 * time.Millisecond) + + // Verify wants were cancelled + if len(fpm.allCancels()) != len(blks) { + t.Fatal("expected cancels to be sent for all wants") + } } func TestSessionFindMorePeers(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond) defer cancel() - fwm := newFakeWantManager() - fpm := newFakeSessionPeerManager() + fpm := newFakePeerManager() + fspm := newFakeSessionPeerManager() fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") session.SetBaseTickDelay(200 * time.Microsecond) blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2) @@ -200,7 +213,7 @@ func TestSessionFindMorePeers(t *testing.T) { // The session should initially broadcast want-haves select { - case <-fwm.wantReqs: + case <-fpm.wantReqs: case <-ctx.Done(): t.Fatal("Did not make first want request ") } @@ -217,14 +230,14 @@ func TestSessionFindMorePeers(t *testing.T) { // The session should now time out waiting for a response and broadcast // want-haves again select { - case <-fwm.wantReqs: + case <-fpm.wantReqs: case <-ctx.Done(): t.Fatal("Did not make second want request ") } // The session should keep broadcasting periodically until it receives a response select { - case receivedWantReq := <-fwm.wantReqs: + case receivedWantReq := <-fpm.wantReqs: if len(receivedWantReq.cids) != broadcastLiveWantsLimit { t.Fatal("did not rebroadcast whole live list") } @@ -250,8 +263,8 @@ func TestSessionFindMorePeers(t *testing.T) { func TestSessionOnPeersExhausted(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() - fwm := newFakeWantManager() - fpm := newFakeSessionPeerManager() + fpm := newFakePeerManager() + fspm := newFakeSessionPeerManager() fpf := newFakeProviderFinder() sim := bssim.New() @@ -259,7 +272,7 @@ func TestSessionOnPeersExhausted(t *testing.T) { notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5) var cids []cid.Cid @@ -273,7 +286,7 @@ func TestSessionOnPeersExhausted(t *testing.T) { } // Wait for initial want request - receivedWantReq := <-fwm.wantReqs + receivedWantReq := <-fpm.wantReqs // Should have sent out broadcast request for wants if len(receivedWantReq.cids) != broadcastLiveWantsLimit { @@ -284,7 +297,7 @@ func TestSessionOnPeersExhausted(t *testing.T) { session.onPeersExhausted(cids[len(cids)-2:]) // Wait for want request - receivedWantReq = <-fwm.wantReqs + receivedWantReq = <-fpm.wantReqs // Should have sent out broadcast request for wants if len(receivedWantReq.cids) != 2 { @@ -295,15 +308,15 @@ func TestSessionOnPeersExhausted(t *testing.T) { func TestSessionFailingToGetFirstBlock(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - fwm := newFakeWantManager() - fpm := newFakeSessionPeerManager() + fpm := newFakePeerManager() + fspm := newFakeSessionPeerManager() fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "") + session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "") blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(4) var cids []cid.Cid @@ -318,14 +331,14 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { // The session should initially broadcast want-haves select { - case <-fwm.wantReqs: + case <-fpm.wantReqs: case <-ctx.Done(): t.Fatal("Did not make first want request ") } // Verify a broadcast was made select { - case receivedWantReq := <-fwm.wantReqs: + case receivedWantReq := <-fpm.wantReqs: if len(receivedWantReq.cids) < len(cids) { t.Fatal("did not rebroadcast whole live list") } @@ -346,7 +359,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { // Wait for another broadcast to occur select { - case receivedWantReq := <-fwm.wantReqs: + case receivedWantReq := <-fpm.wantReqs: if len(receivedWantReq.cids) < len(cids) { t.Fatal("did not rebroadcast whole live list") } @@ -357,7 +370,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { // Wait for another broadcast to occur startTick = time.Now() select { - case receivedWantReq := <-fwm.wantReqs: + case receivedWantReq := <-fpm.wantReqs: if len(receivedWantReq.cids) < len(cids) { t.Fatal("did not rebroadcast whole live list") } @@ -374,7 +387,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { // Wait for another broadcast to occur startTick = time.Now() select { - case receivedWantReq := <-fwm.wantReqs: + case receivedWantReq := <-fpm.wantReqs: if len(receivedWantReq.cids) < len(cids) { t.Fatal("did not rebroadcast whole live list") } @@ -407,8 +420,8 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { } func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { - fwm := newFakeWantManager() - fpm := newFakeSessionPeerManager() + fpm := newFakePeerManager() + fspm := newFakeSessionPeerManager() fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() @@ -418,7 +431,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { // Create a new session with its own context sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - session := New(sessctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(context.Background(), sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer timerCancel() @@ -450,8 +463,8 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { func TestSessionReceiveMessageAfterShutdown(t *testing.T) { ctx, cancelCtx := context.WithTimeout(context.Background(), 10*time.Millisecond) - fwm := newFakeWantManager() - fpm := newFakeSessionPeerManager() + fpm := newFakePeerManager() + fspm := newFakeSessionPeerManager() fpf := newFakeProviderFinder() sim := bssim.New() @@ -459,7 +472,7 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) { notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() - session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(2) cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()} @@ -470,7 +483,7 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) { } // Wait for initial want request - <-fwm.wantReqs + <-fpm.wantReqs // Shut down session cancelCtx() diff --git a/internal/session/sessionwantsender_test.go b/internal/session/sessionwantsender_test.go index b679e9c61c97bc17d6bd0f38a93dd9848584c7c6..3593009a387fdd90092fcef429575d9aeb662017 100644 --- a/internal/session/sessionwantsender_test.go +++ b/internal/session/sessionwantsender_test.go @@ -66,8 +66,9 @@ func (pm *mockPeerManager) RegisterSession(p peer.ID, sess bspm.Session) bool { return true } -func (pm *mockPeerManager) UnregisterSession(sesid uint64) { -} +func (*mockPeerManager) UnregisterSession(uint64) {} +func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} +func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {} func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) { pm.lk.Lock() diff --git a/internal/sessioninterestmanager/sessioninterestmanager.go b/internal/sessioninterestmanager/sessioninterestmanager.go index 46888c9ad8275cb1a98475305a078623a4643385..6e345b55ebe6072e35166b536501e18666d229a8 100644 --- a/internal/sessioninterestmanager/sessioninterestmanager.go +++ b/internal/sessioninterestmanager/sessioninterestmanager.go @@ -90,7 +90,7 @@ func (sim *SessionInterestManager) SplitWantedUnwanted(blks []blocks.Block) ([]b return wantedBlks, notWantedBlks } -// When the WantManager receives a message is calls InterestedSessions() to +// When the SessionManager receives a message it calls InterestedSessions() to // find out which sessions are interested in the message. func (sim *SessionInterestManager) InterestedSessions(blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []uint64 { sim.lk.RLock() diff --git a/internal/sessionmanager/sessionmanager.go b/internal/sessionmanager/sessionmanager.go index f7382fad3b38f813c2534041a1cbe470f264f9cd..c69aa0417521a530c77e7bcd278a4ab3124b0770 100644 --- a/internal/sessionmanager/sessionmanager.go +++ b/internal/sessionmanager/sessionmanager.go @@ -109,8 +109,10 @@ func (sm *SessionManager) GetNextSessionID() uint64 { return sm.sessID } -func (sm *SessionManager) ReceiveFrom(p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []Session { - sessions := make([]Session, 0) +// ReceiveFrom is called when a new message is received +func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) { + // Record block presence for HAVE / DONT_HAVE + sm.blockPresenceManager.ReceiveFrom(p, haves, dontHaves) // Notify each session that is interested in the blocks / HAVEs / DONT_HAVEs for _, id := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) { @@ -120,9 +122,9 @@ func (sm *SessionManager) ReceiveFrom(p peer.ID, blks []cid.Cid, haves []cid.Cid if ok { sess.ReceiveFrom(p, blks, haves, dontHaves) - sessions = append(sessions, sess) } } - return sessions + // Send CANCEL to all peers with want-have / want-block + sm.peerManager.SendCancels(ctx, blks) } diff --git a/internal/sessionmanager/sessionmanager_test.go b/internal/sessionmanager/sessionmanager_test.go index 4e0152bb7c61db2b43d43eac8b1bd6df1dd3efe6..6fa118e7b461e1e8fe2be47570495d554545bd24 100644 --- a/internal/sessionmanager/sessionmanager_test.go +++ b/internal/sessionmanager/sessionmanager_test.go @@ -53,11 +53,16 @@ func (*fakeSesPeerManager) RemovePeer(peer.ID) bool { return false } func (*fakeSesPeerManager) HasPeers() bool { return false } type fakePeerManager struct { + cancels []cid.Cid } func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool { return true } func (*fakePeerManager) UnregisterSession(uint64) {} func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {} +func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} +func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) { + fpm.cancels = append(fpm.cancels, cancels...) +} func sessionFactory(ctx context.Context, id uint64, @@ -101,26 +106,30 @@ func TestReceiveFrom(t *testing.T) { sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()}) sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()}) - sm.ReceiveFrom(p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{}) + sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{}) if len(firstSession.ks) == 0 || len(secondSession.ks) > 0 || len(thirdSession.ks) == 0 { t.Fatal("should have received blocks but didn't") } - sm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{block.Cid()}, []cid.Cid{}) + sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{block.Cid()}, []cid.Cid{}) if len(firstSession.wantBlocks) == 0 || len(secondSession.wantBlocks) > 0 || len(thirdSession.wantBlocks) == 0 { t.Fatal("should have received want-blocks but didn't") } - sm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{block.Cid()}) + sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{block.Cid()}) if len(firstSession.wantHaves) == 0 || len(secondSession.wantHaves) > 0 || len(thirdSession.wantHaves) == 0 { t.Fatal("should have received want-haves but didn't") } + + if len(pm.cancels) != 1 { + t.Fatal("should have sent cancel for received blocks") + } } func TestReceiveBlocksWhenManagerContextCancelled(t *testing.T) { @@ -150,7 +159,7 @@ func TestReceiveBlocksWhenManagerContextCancelled(t *testing.T) { // wait for sessions to get removed time.Sleep(10 * time.Millisecond) - sm.ReceiveFrom(p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{}) + sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{}) if len(firstSession.ks) > 0 || len(secondSession.ks) > 0 || len(thirdSession.ks) > 0 { @@ -186,7 +195,7 @@ func TestReceiveBlocksWhenSessionContextCancelled(t *testing.T) { // wait for sessions to get removed time.Sleep(10 * time.Millisecond) - sm.ReceiveFrom(p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{}) + sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{}) if len(firstSession.ks) == 0 || len(secondSession.ks) > 0 || len(thirdSession.ks) == 0 { diff --git a/internal/wantmanager/wantmanager.go b/internal/wantmanager/wantmanager.go deleted file mode 100644 index 539017a9d07da962139c20d9efd334f103595d51..0000000000000000000000000000000000000000 --- a/internal/wantmanager/wantmanager.go +++ /dev/null @@ -1,103 +0,0 @@ -package wantmanager - -import ( - "context" - - bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" - bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager" - "github.com/ipfs/go-bitswap/internal/sessionmanager" - logging "github.com/ipfs/go-log" - - cid "github.com/ipfs/go-cid" - peer "github.com/libp2p/go-libp2p-core/peer" -) - -var log = logging.Logger("bitswap") - -// PeerHandler sends wants / cancels to other peers -type PeerHandler interface { - // Connected is called when a peer connects. - Connected(p peer.ID) - // Disconnected is called when a peer disconnects - Disconnected(p peer.ID) - // BroadcastWantHaves sends want-haves to all connected peers - BroadcastWantHaves(ctx context.Context, wantHaves []cid.Cid) - // SendCancels sends cancels to all peers that had previously been sent - // a want-block or want-have for the given key - SendCancels(context.Context, []cid.Cid) -} - -// SessionManager receives incoming messages and distributes them to sessions -type SessionManager interface { - ReceiveFrom(p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []sessionmanager.Session -} - -// WantManager -// - informs the SessionManager and BlockPresenceManager of incoming information -// and cancelled sessions -// - informs the PeerManager of connects and disconnects -type WantManager struct { - peerHandler PeerHandler - sim *bssim.SessionInterestManager - bpm *bsbpm.BlockPresenceManager - sm SessionManager -} - -// New initializes a new WantManager for a given context. -func New(ctx context.Context, peerHandler PeerHandler, sim *bssim.SessionInterestManager, bpm *bsbpm.BlockPresenceManager) *WantManager { - return &WantManager{ - peerHandler: peerHandler, - sim: sim, - bpm: bpm, - } -} - -func (wm *WantManager) SetSessionManager(sm SessionManager) { - wm.sm = sm -} - -// ReceiveFrom is called when a new message is received -func (wm *WantManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) { - // Record block presence for HAVE / DONT_HAVE - wm.bpm.ReceiveFrom(p, haves, dontHaves) - // Inform interested sessions - wm.sm.ReceiveFrom(p, blks, haves, dontHaves) - // Send CANCEL to all peers with want-have / want-block - wm.peerHandler.SendCancels(ctx, blks) -} - -// BroadcastWantHaves is called when want-haves should be broadcast to all -// connected peers (as part of session discovery) -func (wm *WantManager) BroadcastWantHaves(ctx context.Context, ses uint64, wantHaves []cid.Cid) { - // TODO: Avoid calling broadcast through here. It doesn't fit with - // everything else this module does. - - log.Debugf("BroadcastWantHaves session%d: %s", ses, wantHaves) - // Send want-haves to all peers - wm.peerHandler.BroadcastWantHaves(ctx, wantHaves) -} - -// RemoveSession is called when the session is shut down -func (wm *WantManager) RemoveSession(ctx context.Context, ses uint64) { - // Remove session's interest in the given blocks. - cancelKs := wm.sim.RemoveSessionInterest(ses) - - // Free up block presence tracking for keys that no session is interested - // in anymore - wm.bpm.RemoveKeys(cancelKs) - - // Send CANCEL to all peers for blocks that no session is interested in anymore - wm.peerHandler.SendCancels(ctx, cancelKs) -} - -// Connected is called when a new peer connects -func (wm *WantManager) Connected(p peer.ID) { - // Tell the peer handler that there is a new connection and give it the - // list of outstanding broadcast wants - wm.peerHandler.Connected(p) -} - -// Disconnected is called when a peer disconnects -func (wm *WantManager) Disconnected(p peer.ID) { - wm.peerHandler.Disconnected(p) -} diff --git a/internal/wantmanager/wantmanager_test.go b/internal/wantmanager/wantmanager_test.go deleted file mode 100644 index 9855eb30d71dde789ced3ad567d55f10e868e024..0000000000000000000000000000000000000000 --- a/internal/wantmanager/wantmanager_test.go +++ /dev/null @@ -1,117 +0,0 @@ -package wantmanager - -import ( - "context" - "testing" - - bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" - bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager" - "github.com/ipfs/go-bitswap/internal/sessionmanager" - "github.com/ipfs/go-bitswap/internal/testutil" - - "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p-core/peer" -) - -type fakePeerHandler struct { - lastBcstWants []cid.Cid - lastCancels []cid.Cid -} - -func (fph *fakePeerHandler) Connected(p peer.ID) { -} -func (fph *fakePeerHandler) Disconnected(p peer.ID) { - -} -func (fph *fakePeerHandler) BroadcastWantHaves(ctx context.Context, wantHaves []cid.Cid) { - fph.lastBcstWants = wantHaves -} -func (fph *fakePeerHandler) SendCancels(ctx context.Context, cancels []cid.Cid) { - fph.lastCancels = cancels -} - -type fakeSessionManager struct { -} - -func (*fakeSessionManager) ReceiveFrom(p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []sessionmanager.Session { - return nil -} - -func TestReceiveFrom(t *testing.T) { - ctx := context.Background() - ph := &fakePeerHandler{} - sim := bssim.New() - bpm := bsbpm.New() - wm := New(context.Background(), ph, sim, bpm) - sm := &fakeSessionManager{} - wm.SetSessionManager(sm) - - p := testutil.GeneratePeers(1)[0] - ks := testutil.GenerateCids(2) - haves := testutil.GenerateCids(2) - dontHaves := testutil.GenerateCids(2) - wm.ReceiveFrom(ctx, p, ks, haves, dontHaves) - - if !bpm.PeerHasBlock(p, haves[0]) { - t.Fatal("expected block presence manager to be invoked") - } - if !bpm.PeerDoesNotHaveBlock(p, dontHaves[0]) { - t.Fatal("expected block presence manager to be invoked") - } - if len(ph.lastCancels) != len(ks) { - t.Fatal("expected received blocks to be cancelled") - } -} - -func TestRemoveSession(t *testing.T) { - ctx := context.Background() - ph := &fakePeerHandler{} - sim := bssim.New() - bpm := bsbpm.New() - wm := New(context.Background(), ph, sim, bpm) - sm := &fakeSessionManager{} - wm.SetSessionManager(sm) - - // Record session interest in 2 keys for session 0 and 2 keys for session 1 - // with 1 overlapping key - cids := testutil.GenerateCids(3) - ses0 := uint64(0) - ses1 := uint64(1) - ses0ks := cids[:2] - ses1ks := cids[1:] - sim.RecordSessionInterest(ses0, ses0ks) - sim.RecordSessionInterest(ses1, ses1ks) - - // Receive HAVE for all keys - p := testutil.GeneratePeers(1)[0] - ks := []cid.Cid{} - haves := append(ses0ks, ses1ks...) - dontHaves := []cid.Cid{} - wm.ReceiveFrom(ctx, p, ks, haves, dontHaves) - - // Remove session 0 - wm.RemoveSession(ctx, ses0) - - // Expect session 0 interest to be removed and session 1 interest to be - // unchanged - if len(sim.FilterSessionInterested(ses0, ses0ks)[0]) != 0 { - t.Fatal("expected session 0 interest to be removed") - } - if len(sim.FilterSessionInterested(ses1, ses1ks)[0]) != len(ses1ks) { - t.Fatal("expected session 1 interest to be unchanged") - } - - // Should clear block presence for key that was in session 0 and not - // in session 1 - if bpm.PeerHasBlock(p, ses0ks[0]) { - t.Fatal("expected block presence manager to be cleared") - } - if !bpm.PeerHasBlock(p, ses0ks[1]) { - t.Fatal("expected block presence manager to be unchanged for overlapping key") - } - - // Should cancel key that was in session 0 and not session 1 - if len(ph.lastCancels) != 1 || !ph.lastCancels[0].Equals(cids[0]) { - t.Fatal("expected removed want-have to be cancelled") - } -}