Unverified Commit 56430042 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #374 from ipfs/refactor/unref-want-mgr

refactor: remove WantManager
parents ba0eb21b 26fbfbf0
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager" bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
bssm "github.com/ipfs/go-bitswap/internal/sessionmanager" bssm "github.com/ipfs/go-bitswap/internal/sessionmanager"
bsspm "github.com/ipfs/go-bitswap/internal/sessionpeermanager" bsspm "github.com/ipfs/go-bitswap/internal/sessionpeermanager"
bswm "github.com/ipfs/go-bitswap/internal/wantmanager"
bsmsg "github.com/ipfs/go-bitswap/message" bsmsg "github.com/ipfs/go-bitswap/message"
bsnet "github.com/ipfs/go-bitswap/network" bsnet "github.com/ipfs/go-bitswap/network"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
...@@ -123,13 +122,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, ...@@ -123,13 +122,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return nil return nil
}) })
var wm *bswm.WantManager
// onDontHaveTimeout is called when a want-block is sent to a peer that // onDontHaveTimeout is called when a want-block is sent to a peer that
// has an old version of Bitswap that doesn't support DONT_HAVE messages, // has an old version of Bitswap that doesn't support DONT_HAVE messages,
// or when no response is received within a timeout. // or when no response is received within a timeout.
var sm *bssm.SessionManager
onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) { onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) {
// Simulate a DONT_HAVE message arriving to the WantManager // Simulate a message arriving with DONT_HAVEs
wm.ReceiveFrom(ctx, p, nil, nil, dontHaves) sm.ReceiveFrom(ctx, p, nil, nil, dontHaves)
} }
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue { peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout) return bsmq.New(ctx, p, network, onDontHaveTimeout)
...@@ -138,10 +137,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, ...@@ -138,10 +137,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sim := bssim.New() sim := bssim.New()
bpm := bsbpm.New() bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self()) pm := bspm.New(ctx, peerQueueFactory, network.Self())
wm = bswm.New(ctx, pm, sim, bpm)
pqm := bspqm.New(ctx, network) pqm := bspqm.New(ctx, network)
sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager, sessionFactory := func(sessctx context.Context, id uint64, spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager, sim *bssim.SessionInterestManager,
pm bssession.PeerManager, pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager, bpm *bsbpm.BlockPresenceManager,
...@@ -149,14 +147,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, ...@@ -149,14 +147,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
provSearchDelay time.Duration, provSearchDelay time.Duration,
rebroadcastDelay delay.D, rebroadcastDelay delay.D,
self peer.ID) bssm.Session { self peer.ID) bssm.Session {
return bssession.New(ctx, id, wm, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) return bssession.New(ctx, sessctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
} }
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager { sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager()) return bsspm.New(id, network.ConnectionManager())
} }
notif := notifications.New() notif := notifications.New()
sm := bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self()) sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
wm.SetSessionManager(sm)
engine := decision.NewEngine(ctx, bstore, network.ConnectionManager(), network.Self()) engine := decision.NewEngine(ctx, bstore, network.ConnectionManager(), network.Self())
bs := &Bitswap{ bs := &Bitswap{
...@@ -166,7 +163,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, ...@@ -166,7 +163,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
process: px, process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize), newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize), provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pm: pm, pm: pm,
pqm: pqm, pqm: pqm,
sm: sm, sm: sm,
...@@ -207,9 +203,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, ...@@ -207,9 +203,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
// Bitswap instances implement the bitswap protocol. // Bitswap instances implement the bitswap protocol.
type Bitswap struct { type Bitswap struct {
// the wantlist tracks global wants for bitswap
wm *bswm.WantManager
pm *bspm.PeerManager pm *bspm.PeerManager
// the provider query manager manages requests to find providers // the provider query manager manages requests to find providers
...@@ -357,7 +350,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b ...@@ -357,7 +350,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
// Send all block keys (including duplicates) to any sessions that want them. // Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes) // (The duplicates are needed by sessions for accounting purposes)
bs.wm.ReceiveFrom(ctx, from, allKs, haves, dontHaves) bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)
// Send wanted blocks to decision engine // Send wanted blocks to decision engine
bs.engine.ReceiveFrom(from, wanted, haves) bs.engine.ReceiveFrom(from, wanted, haves)
...@@ -480,14 +473,14 @@ func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool { ...@@ -480,14 +473,14 @@ func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool {
// PeerConnected is called by the network interface // PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap. // when a peer initiates a new connection to bitswap.
func (bs *Bitswap) PeerConnected(p peer.ID) { func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.wm.Connected(p) bs.pm.Connected(p)
bs.engine.PeerConnected(p) bs.engine.PeerConnected(p)
} }
// PeerDisconnected is called by the network interface when a peer // PeerDisconnected is called by the network interface when a peer
// closes a connection // closes a connection
func (bs *Bitswap) PeerDisconnected(p peer.ID) { func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.wm.Disconnected(p) bs.pm.Disconnected(p)
bs.engine.PeerDisconnected(p) bs.engine.PeerDisconnected(p)
} }
......
docs/go-bitswap.png

82.9 KB | W: | H:

docs/go-bitswap.png

80 KB | W: | H:

docs/go-bitswap.png
docs/go-bitswap.png
docs/go-bitswap.png
docs/go-bitswap.png
  • 2-up
  • Swipe
  • Onion skin
...@@ -11,13 +11,6 @@ node "Sending Blocks" { ...@@ -11,13 +11,6 @@ node "Sending Blocks" {
[Engine] --> [TaskWorker (workers.go)] [Engine] --> [TaskWorker (workers.go)]
} }
node "Requesting Blocks" {
[Bitswap] --* [WantManager]
[WantManager] --> [BlockPresenceManager]
[WantManager] --> [PeerManager]
[PeerManager] --* [MessageQueue]
}
node "Providing" { node "Providing" {
[Bitswap] --* [Provide Collector (workers.go)] [Bitswap] --* [Provide Collector (workers.go)]
[Provide Collector (workers.go)] --* [Provide Worker (workers.go)] [Provide Collector (workers.go)] --* [Provide Worker (workers.go)]
...@@ -31,14 +24,19 @@ node "Sessions (smart requests)" { ...@@ -31,14 +24,19 @@ node "Sessions (smart requests)" {
[Bitswap] --* [SessionManager] [Bitswap] --* [SessionManager]
[SessionManager] --> [SessionInterestManager] [SessionManager] --> [SessionInterestManager]
[SessionManager] --o [Session] [SessionManager] --o [Session]
[SessionManager] --> [BlockPresenceManager]
[Session] --* [sessionWantSender] [Session] --* [sessionWantSender]
[Session] --* [SessionPeerManager] [Session] --* [SessionPeerManager]
[Session] --> [WantManager]
[Session] --> [ProvideQueryManager] [Session] --> [ProvideQueryManager]
[Session] --* [sessionWants] [Session] --* [sessionWants]
[Session] --> [SessionInterestManager] [Session] --> [SessionInterestManager]
[sessionWantSender] --> [BlockPresenceManager] [sessionWantSender] --> [BlockPresenceManager]
}
node "Requesting Blocks" {
[SessionManager] --> [PeerManager]
[sessionWantSender] --> [PeerManager] [sessionWantSender] --> [PeerManager]
[PeerManager] --* [MessageQueue]
} }
node "Network" { node "Network" {
......
...@@ -74,8 +74,8 @@ When a message is received, Bitswap ...@@ -74,8 +74,8 @@ When a message is received, Bitswap
So that the Engine can send responses to the wants So that the Engine can send responses to the wants
- Informs the Engine of any received blocks - Informs the Engine of any received blocks
So that the Engine can send the received blocks to any peers that want them So that the Engine can send the received blocks to any peers that want them
- Informs the WantManager of received blocks, HAVEs and DONT_HAVEs - Informs the SessionManager of received blocks, HAVEs and DONT_HAVEs
So that the WantManager can inform interested sessions So that the SessionManager can inform interested sessions
When the client makes an API call, Bitswap creates a new Session and calls the corresponding method (eg `GetBlocks()`). When the client makes an API call, Bitswap creates a new Session and calls the corresponding method (eg `GetBlocks()`).
...@@ -101,9 +101,10 @@ The PeerTaskQueue prioritizes tasks such that the peers with the least amount of ...@@ -101,9 +101,10 @@ The PeerTaskQueue prioritizes tasks such that the peers with the least amount of
### Requesting Blocks ### Requesting Blocks
When the WantManager is informed of a new message, it When the SessionManager is informed of a new message, it
- informs the SessionManager - informs the BlockPresenceManager
The SessionManager informs the Sessions that are interested in the received blocks and wants The BlockPresenceManager keeps track of which peers have sent HAVES and DONT_HAVEs for each block
- informs the Sessions that are interested in the received blocks and wants
- informs the PeerManager of received blocks - informs the PeerManager of received blocks
The PeerManager checks if any wants were send to a peer for the received blocks. If so it sends a `CANCEL` message to those peers. The PeerManager checks if any wants were send to a peer for the received blocks. If so it sends a `CANCEL` message to those peers.
...@@ -114,7 +115,7 @@ The Session starts in "discovery" mode. This means it doesn't have any peers yet ...@@ -114,7 +115,7 @@ The Session starts in "discovery" mode. This means it doesn't have any peers yet
When the client initially requests blocks from a Session, the Session When the client initially requests blocks from a Session, the Session
- informs the SessionInterestManager that it is interested in the want - informs the SessionInterestManager that it is interested in the want
- informs the sessionWantManager of the want - informs the sessionWantManager of the want
- tells the WantManager to broadcast a `want-have` to all connected peers so as to discover which peers have the block - tells the PeerManager to broadcast a `want-have` to all connected peers so as to discover which peers have the block
- queries the ProviderQueryManager to discover which peers have the block - queries the ProviderQueryManager to discover which peers have the block
When the session receives a message with `HAVE` or a `block`, it informs the SessionPeerManager. The SessionPeerManager keeps track of all peers in the session. When the session receives a message with `HAVE` or a `block`, it informs the SessionPeerManager. The SessionPeerManager keeps track of all peers in the session.
......
...@@ -25,17 +25,6 @@ const ( ...@@ -25,17 +25,6 @@ const (
broadcastLiveWantsLimit = 64 broadcastLiveWantsLimit = 64
) )
// WantManager is an interface that can be used to request blocks
// from given peers.
type WantManager interface {
// BroadcastWantHaves sends want-haves to all connected peers (used for
// session discovery)
BroadcastWantHaves(context.Context, uint64, []cid.Cid)
// RemoveSession removes the session from the WantManager (when the
// session shuts down)
RemoveSession(context.Context, uint64)
}
// PeerManager keeps track of which sessions are interested in which peers // PeerManager keeps track of which sessions are interested in which peers
// and takes care of sending wants for the sessions // and takes care of sending wants for the sessions
type PeerManager interface { type PeerManager interface {
...@@ -47,6 +36,11 @@ type PeerManager interface { ...@@ -47,6 +36,11 @@ type PeerManager interface {
UnregisterSession(uint64) UnregisterSession(uint64)
// SendWants tells the PeerManager to send wants to the given peer // SendWants tells the PeerManager to send wants to the given peer
SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
// BroadcastWantHaves sends want-haves to all connected peers (used for
// session discovery)
BroadcastWantHaves(context.Context, []cid.Cid)
// SendCancels tells the PeerManager to send cancels to all peers
SendCancels(context.Context, []cid.Cid)
} }
// SessionPeerManager keeps track of peers in the session // SessionPeerManager keeps track of peers in the session
...@@ -97,8 +91,10 @@ type op struct { ...@@ -97,8 +91,10 @@ type op struct {
// info to, and who to request blocks from. // info to, and who to request blocks from.
type Session struct { type Session struct {
// dependencies // dependencies
ctx context.Context bsctx context.Context // context for bitswap
wm WantManager ctx context.Context // context for session
pm PeerManager
bpm *bsbpm.BlockPresenceManager
sprm SessionPeerManager sprm SessionPeerManager
providerFinder ProviderFinder providerFinder ProviderFinder
sim *bssim.SessionInterestManager sim *bssim.SessionInterestManager
...@@ -129,9 +125,10 @@ type Session struct { ...@@ -129,9 +125,10 @@ type Session struct {
// New creates a new bitswap session whose lifetime is bounded by the // New creates a new bitswap session whose lifetime is bounded by the
// given context. // given context.
func New(ctx context.Context, func New(
bsctx context.Context, // context for bitswap
ctx context.Context, // context for this session
id uint64, id uint64,
wm WantManager,
sprm SessionPeerManager, sprm SessionPeerManager,
providerFinder ProviderFinder, providerFinder ProviderFinder,
sim *bssim.SessionInterestManager, sim *bssim.SessionInterestManager,
...@@ -144,8 +141,10 @@ func New(ctx context.Context, ...@@ -144,8 +141,10 @@ func New(ctx context.Context,
s := &Session{ s := &Session{
sw: newSessionWants(broadcastLiveWantsLimit), sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration), tickDelayReqs: make(chan time.Duration),
bsctx: bsctx,
ctx: ctx, ctx: ctx,
wm: wm, pm: pm,
bpm: bpm,
sprm: sprm, sprm: sprm,
providerFinder: providerFinder, providerFinder: providerFinder,
sim: sim, sim: sim,
...@@ -301,13 +300,13 @@ func (s *Session) run(ctx context.Context) { ...@@ -301,13 +300,13 @@ func (s *Session) run(ctx context.Context) {
s.sw.WantsSent(oper.keys) s.sw.WantsSent(oper.keys)
case opBroadcast: case opBroadcast:
// Broadcast want-haves to all peers // Broadcast want-haves to all peers
s.broadcastWantHaves(ctx, oper.keys) s.broadcast(ctx, oper.keys)
default: default:
panic("unhandled operation") panic("unhandled operation")
} }
case <-s.idleTick.C: case <-s.idleTick.C:
// The session hasn't received blocks for a while, broadcast // The session hasn't received blocks for a while, broadcast
s.broadcastWantHaves(ctx, nil) s.broadcast(ctx, nil)
case <-s.periodicSearchTimer.C: case <-s.periodicSearchTimer.C:
// Periodically search for a random live want // Periodically search for a random live want
s.handlePeriodicSearch(ctx) s.handlePeriodicSearch(ctx)
...@@ -325,7 +324,7 @@ func (s *Session) run(ctx context.Context) { ...@@ -325,7 +324,7 @@ func (s *Session) run(ctx context.Context) {
// Called when the session hasn't received any blocks for some time, or when // Called when the session hasn't received any blocks for some time, or when
// all peers in the session have sent DONT_HAVE for a particular set of CIDs. // all peers in the session have sent DONT_HAVE for a particular set of CIDs.
// Send want-haves to all connected peers, and search for new peers with the CID. // Send want-haves to all connected peers, and search for new peers with the CID.
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { func (s *Session) broadcast(ctx context.Context, wants []cid.Cid) {
// If this broadcast is because of an idle timeout (we haven't received // If this broadcast is because of an idle timeout (we haven't received
// any blocks for a while) then broadcast all pending wants // any blocks for a while) then broadcast all pending wants
if wants == nil { if wants == nil {
...@@ -333,7 +332,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { ...@@ -333,7 +332,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
} }
// Broadcast a want-have for the live wants to everyone we're connected to // Broadcast a want-have for the live wants to everyone we're connected to
s.wm.BroadcastWantHaves(ctx, s.id, wants) s.broadcastWantHaves(ctx, wants)
// do not find providers on consecutive ticks // do not find providers on consecutive ticks
// -- just rely on periodic search widening // -- just rely on periodic search widening
...@@ -341,7 +340,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) { ...@@ -341,7 +340,7 @@ func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
// Search for providers who have the first want in the list. // Search for providers who have the first want in the list.
// Typically if the provider has the first block they will have // Typically if the provider has the first block they will have
// the rest of the blocks also. // the rest of the blocks also.
log.Debugf("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, wants[0], len(wants)) log.Debugw("FindMorePeers", "session", s.id, "cid", wants[0], "pending", len(wants))
s.findMorePeers(ctx, wants[0]) s.findMorePeers(ctx, wants[0])
} }
s.resetIdleTick() s.resetIdleTick()
...@@ -364,7 +363,7 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) { ...@@ -364,7 +363,7 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) {
// for new providers for blocks. // for new providers for blocks.
s.findMorePeers(ctx, randomWant) s.findMorePeers(ctx, randomWant)
s.wm.BroadcastWantHaves(ctx, s.id, []cid.Cid{randomWant}) s.broadcastWantHaves(ctx, []cid.Cid{randomWant})
s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime()) s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
} }
...@@ -390,8 +389,19 @@ func (s *Session) handleShutdown() { ...@@ -390,8 +389,19 @@ func (s *Session) handleShutdown() {
// Shut down the sessionWantSender (blocks until sessionWantSender stops // Shut down the sessionWantSender (blocks until sessionWantSender stops
// sending) // sending)
s.sws.Shutdown() s.sws.Shutdown()
// Remove the session from the want manager
s.wm.RemoveSession(s.ctx, s.id) // Remove session's interest in the given blocks.
cancelKs := s.sim.RemoveSessionInterest(s.id)
// Free up block presence tracking for keys that no session is interested
// in anymore
s.bpm.RemoveKeys(cancelKs)
// Send CANCEL to all peers for blocks that no session is interested in
// anymore.
// Note: use bitswap context because session context has already been
// cancelled.
s.pm.SendCancels(s.bsctx, cancelKs)
} }
// handleReceive is called when the session receives blocks from a peer // handleReceive is called when the session receives blocks from a peer
...@@ -439,11 +449,17 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) { ...@@ -439,11 +449,17 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
// No peers discovered yet, broadcast some want-haves // No peers discovered yet, broadcast some want-haves
ks := s.sw.GetNextWants() ks := s.sw.GetNextWants()
if len(ks) > 0 { if len(ks) > 0 {
log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks)) log.Infow("No peers - broadcasting", "session", s.id, "want-count", len(ks))
s.wm.BroadcastWantHaves(ctx, s.id, ks) s.broadcastWantHaves(ctx, ks)
} }
} }
// Send want-haves to all connected peers
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
log.Debugw("broadcastWantHaves", "session", s.id, "cids", wants)
s.pm.BroadcastWantHaves(ctx, wants)
}
// The session will broadcast if it has outstanding wants and doesn't receive // The session will broadcast if it has outstanding wants and doesn't receive
// any blocks for some time. // any blocks for some time.
// The length of time is calculated // The length of time is calculated
......
...@@ -2,6 +2,7 @@ package session ...@@ -2,6 +2,7 @@ package session
import ( import (
"context" "context"
"sync"
"testing" "testing"
"time" "time"
...@@ -17,28 +18,6 @@ import ( ...@@ -17,28 +18,6 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
) )
type wantReq struct {
cids []cid.Cid
}
type fakeWantManager struct {
wantReqs chan wantReq
}
func newFakeWantManager() *fakeWantManager {
return &fakeWantManager{
wantReqs: make(chan wantReq, 1),
}
}
func (fwm *fakeWantManager) BroadcastWantHaves(ctx context.Context, sesid uint64, cids []cid.Cid) {
select {
case fwm.wantReqs <- wantReq{cids}:
case <-ctx.Done():
}
}
func (fwm *fakeWantManager) RemoveSession(context.Context, uint64) {}
func newFakeSessionPeerManager() *bsspm.SessionPeerManager { func newFakeSessionPeerManager() *bsspm.SessionPeerManager {
return bsspm.New(1, newFakePeerTagger()) return bsspm.New(1, newFakePeerTagger())
} }
...@@ -76,11 +55,20 @@ func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid ...@@ -76,11 +55,20 @@ func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid
return make(chan peer.ID) return make(chan peer.ID)
} }
type wantReq struct {
cids []cid.Cid
}
type fakePeerManager struct { type fakePeerManager struct {
wantReqs chan wantReq
lk sync.Mutex
cancels []cid.Cid
} }
func newFakePeerManager() *fakePeerManager { func newFakePeerManager() *fakePeerManager {
return &fakePeerManager{} return &fakePeerManager{
wantReqs: make(chan wantReq, 1),
}
} }
func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool { func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool {
...@@ -88,19 +76,34 @@ func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool { ...@@ -88,19 +76,34 @@ func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool {
} }
func (pm *fakePeerManager) UnregisterSession(uint64) {} func (pm *fakePeerManager) UnregisterSession(uint64) {}
func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {} func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}
func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Cid) {
select {
case pm.wantReqs <- wantReq{cids}:
case <-ctx.Done():
}
}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
pm.lk.Lock()
defer pm.lk.Unlock()
pm.cancels = append(pm.cancels, cancels...)
}
func (pm *fakePeerManager) allCancels() []cid.Cid {
pm.lk.Lock()
defer pm.lk.Unlock()
return append([]cid.Cid{}, pm.cancels...)
}
func TestSessionGetBlocks(t *testing.T) { func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel() fpm := newFakePeerManager()
fwm := newFakeWantManager() fspm := newFakeSessionPeerManager()
fpm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder() fpf := newFakeProviderFinder()
sim := bssim.New() sim := bssim.New()
bpm := bsbpm.New() bpm := bsbpm.New()
notif := notifications.New() notif := notifications.New()
defer notif.Shutdown() defer notif.Shutdown()
id := testutil.GenerateSessionID() id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator() blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2) blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid var cids []cid.Cid
...@@ -115,7 +118,7 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -115,7 +118,7 @@ func TestSessionGetBlocks(t *testing.T) {
} }
// Wait for initial want request // Wait for initial want request
receivedWantReq := <-fwm.wantReqs receivedWantReq := <-fpm.wantReqs
// Should have registered session's interest in blocks // Should have registered session's interest in blocks
intSes := sim.FilterSessionInterested(id, cids) intSes := sim.FilterSessionInterested(id, cids)
...@@ -138,7 +141,7 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -138,7 +141,7 @@ func TestSessionGetBlocks(t *testing.T) {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
// Verify new peers were recorded // Verify new peers were recorded
if !testutil.MatchPeersIgnoreOrder(fpm.Peers(), peers) { if !testutil.MatchPeersIgnoreOrder(fspm.Peers(), peers) {
t.Fatal("peers not recorded by the peer manager") t.Fatal("peers not recorded by the peer manager")
} }
...@@ -172,20 +175,30 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -172,20 +175,30 @@ func TestSessionGetBlocks(t *testing.T) {
if len(wanted) != len(blks)-1 { if len(wanted) != len(blks)-1 {
t.Fatal("session wants incorrect number of blocks") t.Fatal("session wants incorrect number of blocks")
} }
// Shut down session
cancel()
time.Sleep(10 * time.Millisecond)
// Verify wants were cancelled
if len(fpm.allCancels()) != len(blks) {
t.Fatal("expected cancels to be sent for all wants")
}
} }
func TestSessionFindMorePeers(t *testing.T) { func TestSessionFindMorePeers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 900*time.Millisecond)
defer cancel() defer cancel()
fwm := newFakeWantManager() fpm := newFakePeerManager()
fpm := newFakeSessionPeerManager() fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder() fpf := newFakeProviderFinder()
sim := bssim.New() sim := bssim.New()
bpm := bsbpm.New() bpm := bsbpm.New()
notif := notifications.New() notif := notifications.New()
defer notif.Shutdown() defer notif.Shutdown()
id := testutil.GenerateSessionID() id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
session.SetBaseTickDelay(200 * time.Microsecond) session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator() blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2) blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
...@@ -200,7 +213,7 @@ func TestSessionFindMorePeers(t *testing.T) { ...@@ -200,7 +213,7 @@ func TestSessionFindMorePeers(t *testing.T) {
// The session should initially broadcast want-haves // The session should initially broadcast want-haves
select { select {
case <-fwm.wantReqs: case <-fpm.wantReqs:
case <-ctx.Done(): case <-ctx.Done():
t.Fatal("Did not make first want request ") t.Fatal("Did not make first want request ")
} }
...@@ -217,14 +230,14 @@ func TestSessionFindMorePeers(t *testing.T) { ...@@ -217,14 +230,14 @@ func TestSessionFindMorePeers(t *testing.T) {
// The session should now time out waiting for a response and broadcast // The session should now time out waiting for a response and broadcast
// want-haves again // want-haves again
select { select {
case <-fwm.wantReqs: case <-fpm.wantReqs:
case <-ctx.Done(): case <-ctx.Done():
t.Fatal("Did not make second want request ") t.Fatal("Did not make second want request ")
} }
// The session should keep broadcasting periodically until it receives a response // The session should keep broadcasting periodically until it receives a response
select { select {
case receivedWantReq := <-fwm.wantReqs: case receivedWantReq := <-fpm.wantReqs:
if len(receivedWantReq.cids) != broadcastLiveWantsLimit { if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list") t.Fatal("did not rebroadcast whole live list")
} }
...@@ -250,8 +263,8 @@ func TestSessionFindMorePeers(t *testing.T) { ...@@ -250,8 +263,8 @@ func TestSessionFindMorePeers(t *testing.T) {
func TestSessionOnPeersExhausted(t *testing.T) { func TestSessionOnPeersExhausted(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel() defer cancel()
fwm := newFakeWantManager() fpm := newFakePeerManager()
fpm := newFakeSessionPeerManager() fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder() fpf := newFakeProviderFinder()
sim := bssim.New() sim := bssim.New()
...@@ -259,7 +272,7 @@ func TestSessionOnPeersExhausted(t *testing.T) { ...@@ -259,7 +272,7 @@ func TestSessionOnPeersExhausted(t *testing.T) {
notif := notifications.New() notif := notifications.New()
defer notif.Shutdown() defer notif.Shutdown()
id := testutil.GenerateSessionID() id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator() blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5) blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5)
var cids []cid.Cid var cids []cid.Cid
...@@ -273,7 +286,7 @@ func TestSessionOnPeersExhausted(t *testing.T) { ...@@ -273,7 +286,7 @@ func TestSessionOnPeersExhausted(t *testing.T) {
} }
// Wait for initial want request // Wait for initial want request
receivedWantReq := <-fwm.wantReqs receivedWantReq := <-fpm.wantReqs
// Should have sent out broadcast request for wants // Should have sent out broadcast request for wants
if len(receivedWantReq.cids) != broadcastLiveWantsLimit { if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
...@@ -284,7 +297,7 @@ func TestSessionOnPeersExhausted(t *testing.T) { ...@@ -284,7 +297,7 @@ func TestSessionOnPeersExhausted(t *testing.T) {
session.onPeersExhausted(cids[len(cids)-2:]) session.onPeersExhausted(cids[len(cids)-2:])
// Wait for want request // Wait for want request
receivedWantReq = <-fwm.wantReqs receivedWantReq = <-fpm.wantReqs
// Should have sent out broadcast request for wants // Should have sent out broadcast request for wants
if len(receivedWantReq.cids) != 2 { if len(receivedWantReq.cids) != 2 {
...@@ -295,15 +308,15 @@ func TestSessionOnPeersExhausted(t *testing.T) { ...@@ -295,15 +308,15 @@ func TestSessionOnPeersExhausted(t *testing.T) {
func TestSessionFailingToGetFirstBlock(t *testing.T) { func TestSessionFailingToGetFirstBlock(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
fwm := newFakeWantManager() fpm := newFakePeerManager()
fpm := newFakeSessionPeerManager() fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder() fpf := newFakeProviderFinder()
sim := bssim.New() sim := bssim.New()
bpm := bsbpm.New() bpm := bsbpm.New()
notif := notifications.New() notif := notifications.New()
defer notif.Shutdown() defer notif.Shutdown()
id := testutil.GenerateSessionID() id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "") session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
blockGenerator := blocksutil.NewBlockGenerator() blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(4) blks := blockGenerator.Blocks(4)
var cids []cid.Cid var cids []cid.Cid
...@@ -318,14 +331,14 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { ...@@ -318,14 +331,14 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
// The session should initially broadcast want-haves // The session should initially broadcast want-haves
select { select {
case <-fwm.wantReqs: case <-fpm.wantReqs:
case <-ctx.Done(): case <-ctx.Done():
t.Fatal("Did not make first want request ") t.Fatal("Did not make first want request ")
} }
// Verify a broadcast was made // Verify a broadcast was made
select { select {
case receivedWantReq := <-fwm.wantReqs: case receivedWantReq := <-fpm.wantReqs:
if len(receivedWantReq.cids) < len(cids) { if len(receivedWantReq.cids) < len(cids) {
t.Fatal("did not rebroadcast whole live list") t.Fatal("did not rebroadcast whole live list")
} }
...@@ -346,7 +359,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { ...@@ -346,7 +359,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
// Wait for another broadcast to occur // Wait for another broadcast to occur
select { select {
case receivedWantReq := <-fwm.wantReqs: case receivedWantReq := <-fpm.wantReqs:
if len(receivedWantReq.cids) < len(cids) { if len(receivedWantReq.cids) < len(cids) {
t.Fatal("did not rebroadcast whole live list") t.Fatal("did not rebroadcast whole live list")
} }
...@@ -357,7 +370,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { ...@@ -357,7 +370,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
// Wait for another broadcast to occur // Wait for another broadcast to occur
startTick = time.Now() startTick = time.Now()
select { select {
case receivedWantReq := <-fwm.wantReqs: case receivedWantReq := <-fpm.wantReqs:
if len(receivedWantReq.cids) < len(cids) { if len(receivedWantReq.cids) < len(cids) {
t.Fatal("did not rebroadcast whole live list") t.Fatal("did not rebroadcast whole live list")
} }
...@@ -374,7 +387,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { ...@@ -374,7 +387,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
// Wait for another broadcast to occur // Wait for another broadcast to occur
startTick = time.Now() startTick = time.Now()
select { select {
case receivedWantReq := <-fwm.wantReqs: case receivedWantReq := <-fpm.wantReqs:
if len(receivedWantReq.cids) < len(cids) { if len(receivedWantReq.cids) < len(cids) {
t.Fatal("did not rebroadcast whole live list") t.Fatal("did not rebroadcast whole live list")
} }
...@@ -407,8 +420,8 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { ...@@ -407,8 +420,8 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
} }
func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
fwm := newFakeWantManager() fpm := newFakePeerManager()
fpm := newFakeSessionPeerManager() fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder() fpf := newFakeProviderFinder()
sim := bssim.New() sim := bssim.New()
bpm := bsbpm.New() bpm := bsbpm.New()
...@@ -418,7 +431,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { ...@@ -418,7 +431,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
// Create a new session with its own context // Create a new session with its own context
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond) sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
session := New(sessctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") session := New(context.Background(), sessctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond) timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer timerCancel() defer timerCancel()
...@@ -450,8 +463,8 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { ...@@ -450,8 +463,8 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
func TestSessionReceiveMessageAfterShutdown(t *testing.T) { func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
ctx, cancelCtx := context.WithTimeout(context.Background(), 10*time.Millisecond) ctx, cancelCtx := context.WithTimeout(context.Background(), 10*time.Millisecond)
fwm := newFakeWantManager() fpm := newFakePeerManager()
fpm := newFakeSessionPeerManager() fspm := newFakeSessionPeerManager()
fpf := newFakeProviderFinder() fpf := newFakeProviderFinder()
sim := bssim.New() sim := bssim.New()
...@@ -459,7 +472,7 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) { ...@@ -459,7 +472,7 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
notif := notifications.New() notif := notifications.New()
defer notif.Shutdown() defer notif.Shutdown()
id := testutil.GenerateSessionID() id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "") session := New(ctx, ctx, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator() blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(2) blks := blockGenerator.Blocks(2)
cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()} cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}
...@@ -470,7 +483,7 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) { ...@@ -470,7 +483,7 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
} }
// Wait for initial want request // Wait for initial want request
<-fwm.wantReqs <-fpm.wantReqs
// Shut down session // Shut down session
cancelCtx() cancelCtx()
......
...@@ -66,8 +66,9 @@ func (pm *mockPeerManager) RegisterSession(p peer.ID, sess bspm.Session) bool { ...@@ -66,8 +66,9 @@ func (pm *mockPeerManager) RegisterSession(p peer.ID, sess bspm.Session) bool {
return true return true
} }
func (pm *mockPeerManager) UnregisterSession(sesid uint64) { func (*mockPeerManager) UnregisterSession(uint64) {}
} func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {}
func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) { func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
pm.lk.Lock() pm.lk.Lock()
......
...@@ -90,7 +90,7 @@ func (sim *SessionInterestManager) SplitWantedUnwanted(blks []blocks.Block) ([]b ...@@ -90,7 +90,7 @@ func (sim *SessionInterestManager) SplitWantedUnwanted(blks []blocks.Block) ([]b
return wantedBlks, notWantedBlks return wantedBlks, notWantedBlks
} }
// When the WantManager receives a message is calls InterestedSessions() to // When the SessionManager receives a message it calls InterestedSessions() to
// find out which sessions are interested in the message. // find out which sessions are interested in the message.
func (sim *SessionInterestManager) InterestedSessions(blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []uint64 { func (sim *SessionInterestManager) InterestedSessions(blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []uint64 {
sim.lk.RLock() sim.lk.RLock()
......
...@@ -109,8 +109,10 @@ func (sm *SessionManager) GetNextSessionID() uint64 { ...@@ -109,8 +109,10 @@ func (sm *SessionManager) GetNextSessionID() uint64 {
return sm.sessID return sm.sessID
} }
func (sm *SessionManager) ReceiveFrom(p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []Session { // ReceiveFrom is called when a new message is received
sessions := make([]Session, 0) func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// Record block presence for HAVE / DONT_HAVE
sm.blockPresenceManager.ReceiveFrom(p, haves, dontHaves)
// Notify each session that is interested in the blocks / HAVEs / DONT_HAVEs // Notify each session that is interested in the blocks / HAVEs / DONT_HAVEs
for _, id := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) { for _, id := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) {
...@@ -120,9 +122,9 @@ func (sm *SessionManager) ReceiveFrom(p peer.ID, blks []cid.Cid, haves []cid.Cid ...@@ -120,9 +122,9 @@ func (sm *SessionManager) ReceiveFrom(p peer.ID, blks []cid.Cid, haves []cid.Cid
if ok { if ok {
sess.ReceiveFrom(p, blks, haves, dontHaves) sess.ReceiveFrom(p, blks, haves, dontHaves)
sessions = append(sessions, sess)
} }
} }
return sessions // Send CANCEL to all peers with want-have / want-block
sm.peerManager.SendCancels(ctx, blks)
} }
...@@ -53,11 +53,16 @@ func (*fakeSesPeerManager) RemovePeer(peer.ID) bool { return false } ...@@ -53,11 +53,16 @@ func (*fakeSesPeerManager) RemovePeer(peer.ID) bool { return false }
func (*fakeSesPeerManager) HasPeers() bool { return false } func (*fakeSesPeerManager) HasPeers() bool { return false }
type fakePeerManager struct { type fakePeerManager struct {
cancels []cid.Cid
} }
func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool { return true } func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool { return true }
func (*fakePeerManager) UnregisterSession(uint64) {} func (*fakePeerManager) UnregisterSession(uint64) {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {} func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
fpm.cancels = append(fpm.cancels, cancels...)
}
func sessionFactory(ctx context.Context, func sessionFactory(ctx context.Context,
id uint64, id uint64,
...@@ -101,26 +106,30 @@ func TestReceiveFrom(t *testing.T) { ...@@ -101,26 +106,30 @@ func TestReceiveFrom(t *testing.T) {
sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()}) sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()})
sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()}) sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})
sm.ReceiveFrom(p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{}) sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
if len(firstSession.ks) == 0 || if len(firstSession.ks) == 0 ||
len(secondSession.ks) > 0 || len(secondSession.ks) > 0 ||
len(thirdSession.ks) == 0 { len(thirdSession.ks) == 0 {
t.Fatal("should have received blocks but didn't") t.Fatal("should have received blocks but didn't")
} }
sm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{block.Cid()}, []cid.Cid{}) sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{block.Cid()}, []cid.Cid{})
if len(firstSession.wantBlocks) == 0 || if len(firstSession.wantBlocks) == 0 ||
len(secondSession.wantBlocks) > 0 || len(secondSession.wantBlocks) > 0 ||
len(thirdSession.wantBlocks) == 0 { len(thirdSession.wantBlocks) == 0 {
t.Fatal("should have received want-blocks but didn't") t.Fatal("should have received want-blocks but didn't")
} }
sm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{block.Cid()}) sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{block.Cid()})
if len(firstSession.wantHaves) == 0 || if len(firstSession.wantHaves) == 0 ||
len(secondSession.wantHaves) > 0 || len(secondSession.wantHaves) > 0 ||
len(thirdSession.wantHaves) == 0 { len(thirdSession.wantHaves) == 0 {
t.Fatal("should have received want-haves but didn't") t.Fatal("should have received want-haves but didn't")
} }
if len(pm.cancels) != 1 {
t.Fatal("should have sent cancel for received blocks")
}
} }
func TestReceiveBlocksWhenManagerContextCancelled(t *testing.T) { func TestReceiveBlocksWhenManagerContextCancelled(t *testing.T) {
...@@ -150,7 +159,7 @@ func TestReceiveBlocksWhenManagerContextCancelled(t *testing.T) { ...@@ -150,7 +159,7 @@ func TestReceiveBlocksWhenManagerContextCancelled(t *testing.T) {
// wait for sessions to get removed // wait for sessions to get removed
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
sm.ReceiveFrom(p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{}) sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
if len(firstSession.ks) > 0 || if len(firstSession.ks) > 0 ||
len(secondSession.ks) > 0 || len(secondSession.ks) > 0 ||
len(thirdSession.ks) > 0 { len(thirdSession.ks) > 0 {
...@@ -186,7 +195,7 @@ func TestReceiveBlocksWhenSessionContextCancelled(t *testing.T) { ...@@ -186,7 +195,7 @@ func TestReceiveBlocksWhenSessionContextCancelled(t *testing.T) {
// wait for sessions to get removed // wait for sessions to get removed
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
sm.ReceiveFrom(p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{}) sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
if len(firstSession.ks) == 0 || if len(firstSession.ks) == 0 ||
len(secondSession.ks) > 0 || len(secondSession.ks) > 0 ||
len(thirdSession.ks) == 0 { len(thirdSession.ks) == 0 {
......
package wantmanager
import (
"context"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
"github.com/ipfs/go-bitswap/internal/sessionmanager"
logging "github.com/ipfs/go-log"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
)
var log = logging.Logger("bitswap")
// PeerHandler sends wants / cancels to other peers
type PeerHandler interface {
// Connected is called when a peer connects.
Connected(p peer.ID)
// Disconnected is called when a peer disconnects
Disconnected(p peer.ID)
// BroadcastWantHaves sends want-haves to all connected peers
BroadcastWantHaves(ctx context.Context, wantHaves []cid.Cid)
// SendCancels sends cancels to all peers that had previously been sent
// a want-block or want-have for the given key
SendCancels(context.Context, []cid.Cid)
}
// SessionManager receives incoming messages and distributes them to sessions
type SessionManager interface {
ReceiveFrom(p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []sessionmanager.Session
}
// WantManager
// - informs the SessionManager and BlockPresenceManager of incoming information
// and cancelled sessions
// - informs the PeerManager of connects and disconnects
type WantManager struct {
peerHandler PeerHandler
sim *bssim.SessionInterestManager
bpm *bsbpm.BlockPresenceManager
sm SessionManager
}
// New initializes a new WantManager for a given context.
func New(ctx context.Context, peerHandler PeerHandler, sim *bssim.SessionInterestManager, bpm *bsbpm.BlockPresenceManager) *WantManager {
return &WantManager{
peerHandler: peerHandler,
sim: sim,
bpm: bpm,
}
}
func (wm *WantManager) SetSessionManager(sm SessionManager) {
wm.sm = sm
}
// ReceiveFrom is called when a new message is received
func (wm *WantManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// Record block presence for HAVE / DONT_HAVE
wm.bpm.ReceiveFrom(p, haves, dontHaves)
// Inform interested sessions
wm.sm.ReceiveFrom(p, blks, haves, dontHaves)
// Send CANCEL to all peers with want-have / want-block
wm.peerHandler.SendCancels(ctx, blks)
}
// BroadcastWantHaves is called when want-haves should be broadcast to all
// connected peers (as part of session discovery)
func (wm *WantManager) BroadcastWantHaves(ctx context.Context, ses uint64, wantHaves []cid.Cid) {
// TODO: Avoid calling broadcast through here. It doesn't fit with
// everything else this module does.
log.Debugf("BroadcastWantHaves session%d: %s", ses, wantHaves)
// Send want-haves to all peers
wm.peerHandler.BroadcastWantHaves(ctx, wantHaves)
}
// RemoveSession is called when the session is shut down
func (wm *WantManager) RemoveSession(ctx context.Context, ses uint64) {
// Remove session's interest in the given blocks.
cancelKs := wm.sim.RemoveSessionInterest(ses)
// Free up block presence tracking for keys that no session is interested
// in anymore
wm.bpm.RemoveKeys(cancelKs)
// Send CANCEL to all peers for blocks that no session is interested in anymore
wm.peerHandler.SendCancels(ctx, cancelKs)
}
// Connected is called when a new peer connects
func (wm *WantManager) Connected(p peer.ID) {
// Tell the peer handler that there is a new connection and give it the
// list of outstanding broadcast wants
wm.peerHandler.Connected(p)
}
// Disconnected is called when a peer disconnects
func (wm *WantManager) Disconnected(p peer.ID) {
wm.peerHandler.Disconnected(p)
}
package wantmanager
import (
"context"
"testing"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
"github.com/ipfs/go-bitswap/internal/sessionmanager"
"github.com/ipfs/go-bitswap/internal/testutil"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
)
type fakePeerHandler struct {
lastBcstWants []cid.Cid
lastCancels []cid.Cid
}
func (fph *fakePeerHandler) Connected(p peer.ID) {
}
func (fph *fakePeerHandler) Disconnected(p peer.ID) {
}
func (fph *fakePeerHandler) BroadcastWantHaves(ctx context.Context, wantHaves []cid.Cid) {
fph.lastBcstWants = wantHaves
}
func (fph *fakePeerHandler) SendCancels(ctx context.Context, cancels []cid.Cid) {
fph.lastCancels = cancels
}
type fakeSessionManager struct {
}
func (*fakeSessionManager) ReceiveFrom(p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []sessionmanager.Session {
return nil
}
func TestReceiveFrom(t *testing.T) {
ctx := context.Background()
ph := &fakePeerHandler{}
sim := bssim.New()
bpm := bsbpm.New()
wm := New(context.Background(), ph, sim, bpm)
sm := &fakeSessionManager{}
wm.SetSessionManager(sm)
p := testutil.GeneratePeers(1)[0]
ks := testutil.GenerateCids(2)
haves := testutil.GenerateCids(2)
dontHaves := testutil.GenerateCids(2)
wm.ReceiveFrom(ctx, p, ks, haves, dontHaves)
if !bpm.PeerHasBlock(p, haves[0]) {
t.Fatal("expected block presence manager to be invoked")
}
if !bpm.PeerDoesNotHaveBlock(p, dontHaves[0]) {
t.Fatal("expected block presence manager to be invoked")
}
if len(ph.lastCancels) != len(ks) {
t.Fatal("expected received blocks to be cancelled")
}
}
func TestRemoveSession(t *testing.T) {
ctx := context.Background()
ph := &fakePeerHandler{}
sim := bssim.New()
bpm := bsbpm.New()
wm := New(context.Background(), ph, sim, bpm)
sm := &fakeSessionManager{}
wm.SetSessionManager(sm)
// Record session interest in 2 keys for session 0 and 2 keys for session 1
// with 1 overlapping key
cids := testutil.GenerateCids(3)
ses0 := uint64(0)
ses1 := uint64(1)
ses0ks := cids[:2]
ses1ks := cids[1:]
sim.RecordSessionInterest(ses0, ses0ks)
sim.RecordSessionInterest(ses1, ses1ks)
// Receive HAVE for all keys
p := testutil.GeneratePeers(1)[0]
ks := []cid.Cid{}
haves := append(ses0ks, ses1ks...)
dontHaves := []cid.Cid{}
wm.ReceiveFrom(ctx, p, ks, haves, dontHaves)
// Remove session 0
wm.RemoveSession(ctx, ses0)
// Expect session 0 interest to be removed and session 1 interest to be
// unchanged
if len(sim.FilterSessionInterested(ses0, ses0ks)[0]) != 0 {
t.Fatal("expected session 0 interest to be removed")
}
if len(sim.FilterSessionInterested(ses1, ses1ks)[0]) != len(ses1ks) {
t.Fatal("expected session 1 interest to be unchanged")
}
// Should clear block presence for key that was in session 0 and not
// in session 1
if bpm.PeerHasBlock(p, ses0ks[0]) {
t.Fatal("expected block presence manager to be cleared")
}
if !bpm.PeerHasBlock(p, ses0ks[1]) {
t.Fatal("expected block presence manager to be unchanged for overlapping key")
}
// Should cancel key that was in session 0 and not session 1
if len(ph.lastCancels) != 1 || !ph.lastCancels[0].Equals(cids[0]) {
t.Fatal("expected removed want-have to be cancelled")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment