Commit 0bd2ede0 authored by Dirk McCormick's avatar Dirk McCormick Committed by Steven Allen

refactor: use global pubsub notifier

parent a5fe0d4b
......@@ -16,6 +16,7 @@ import (
bsmsg "github.com/ipfs/go-bitswap/message"
bsmq "github.com/ipfs/go-bitswap/messagequeue"
bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/notifications"
bspm "github.com/ipfs/go-bitswap/peermanager"
bspqm "github.com/ipfs/go-bitswap/providerquerymanager"
bssession "github.com/ipfs/go-bitswap/session"
......@@ -116,9 +117,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
pqm := bspqm.New(ctx, network)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter,
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs, provSearchDelay, rebroadcastDelay)
return bssession.New(ctx, id, wm, pm, srs, notif, provSearchDelay, rebroadcastDelay)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
return bsspm.New(ctx, id, network.ConnectionManager(), pqm)
......@@ -126,6 +128,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
return bssrs.New(ctx)
}
notif := notifications.New()
bs := &Bitswap{
blockstore: bstore,
......@@ -136,7 +139,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pqm: pqm,
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory, notif),
notif: notif,
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
......@@ -163,6 +167,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
go func() {
<-px.Closing() // process closes first
cancelFunc()
notif.Shutdown()
}()
procctx.CloseAfterContext(px, ctx) // parent cancelled first
......@@ -187,6 +192,9 @@ type Bitswap struct {
// NB: ensure threadsafety
blockstore blockstore.Blockstore
// manages channels of outgoing blocks for sessions
notif notifications.PubSub
// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
......@@ -314,6 +322,13 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
// Send wanted blocks to decision engine
bs.engine.AddBlocks(wanted)
// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of received
// blocks)
for _, b := range wanted {
bs.notif.Publish(b)
}
// If the reprovider is enabled, send wanted blocks to reprovider
if bs.provideEnabled {
for _, b := range wanted {
......
......@@ -101,6 +101,7 @@ func New(ctx context.Context,
wm WantManager,
pm PeerManager,
srs RequestSplitter,
notif notifications.PubSub,
initialSearchDelay time.Duration,
periodicSearchDelay delay.D) *Session {
s := &Session{
......@@ -117,7 +118,7 @@ func New(ctx context.Context,
pm: pm,
srs: srs,
incoming: make(chan blksRecv),
notif: notifications.New(),
notif: notif,
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: id,
......@@ -359,7 +360,6 @@ func (s *Session) randomLiveWant() cid.Cid {
}
func (s *Session) handleShutdown() {
s.idleTick.Stop()
s.notif.Shutdown()
live := make([]cid.Cid, 0, len(s.liveWants))
for c := range s.liveWants {
......@@ -395,8 +395,6 @@ func (s *Session) receiveBlocks(ctx context.Context, blocks []blocks.Block) {
// that have occurred since the last new block
s.consecutiveTicks = 0
s.notif.Publish(blk)
// Keep track of CIDs we've successfully fetched
s.pastWants.Push(c)
}
......
......@@ -6,6 +6,7 @@ import (
"testing"
"time"
notifications "github.com/ipfs/go-bitswap/notifications"
bssd "github.com/ipfs/go-bitswap/sessiondata"
"github.com/ipfs/go-bitswap/testutil"
blocks "github.com/ipfs/go-block-format"
......@@ -92,8 +93,10 @@ func TestSessionGetBlocks(t *testing.T) {
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{}
frs := &fakeRequestSplitter{}
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm, frs, time.Second, delay.Fixed(time.Minute))
session := New(ctx, id, fwm, fpm, frs, notif, time.Second, delay.Fixed(time.Minute))
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
......@@ -122,7 +125,13 @@ func TestSessionGetBlocks(t *testing.T) {
var newBlockReqs []wantReq
var receivedBlocks []blocks.Block
for i, p := range peers {
session.ReceiveBlocksFrom(p, []blocks.Block{blks[testutil.IndexOf(blks, receivedWantReq.cids[i])]})
// simulate what bitswap does on receiving a message:
// - calls ReceiveBlocksFrom() on session
// - publishes block to pubsub channel
blk := blks[testutil.IndexOf(blks, receivedWantReq.cids[i])]
session.ReceiveBlocksFrom(p, []blocks.Block{blk})
notif.Publish(blk)
select {
case cancelBlock := <-cancelReqs:
newCancelReqs = append(newCancelReqs, cancelBlock)
......@@ -178,7 +187,13 @@ func TestSessionGetBlocks(t *testing.T) {
// receive remaining blocks
for i, p := range peers {
session.ReceiveBlocksFrom(p, []blocks.Block{blks[testutil.IndexOf(blks, newCidsRequested[i])]})
// simulate what bitswap does on receiving a message:
// - calls ReceiveBlocksFrom() on session
// - publishes block to pubsub channel
blk := blks[testutil.IndexOf(blks, newCidsRequested[i])]
session.ReceiveBlocksFrom(p, []blocks.Block{blk})
notif.Publish(blk)
receivedBlock := <-getBlocksCh
receivedBlocks = append(receivedBlocks, receivedBlock)
cancelBlock := <-cancelReqs
......@@ -207,8 +222,10 @@ func TestSessionFindMorePeers(t *testing.T) {
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{findMorePeersRequested: make(chan cid.Cid, 1)}
frs := &fakeRequestSplitter{}
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm, frs, time.Second, delay.Fixed(time.Minute))
session := New(ctx, id, fwm, fpm, frs, notif, time.Second, delay.Fixed(time.Minute))
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
......@@ -233,7 +250,13 @@ func TestSessionFindMorePeers(t *testing.T) {
// or there will be no tick set -- time precision on Windows in go is in the
// millisecond range
p := testutil.GeneratePeers(1)[0]
session.ReceiveBlocksFrom(p, []blocks.Block{blks[0]})
// simulate what bitswap does on receiving a message:
// - calls ReceiveBlocksFrom() on session
// - publishes block to pubsub channel
blk := blks[0]
session.ReceiveBlocksFrom(p, []blocks.Block{blk})
notif.Publish(blk)
select {
case <-cancelReqs:
case <-ctx.Done():
......@@ -279,9 +302,11 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{findMorePeersRequested: make(chan cid.Cid, 1)}
frs := &fakeRequestSplitter{}
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm, frs, 10*time.Millisecond, delay.Fixed(100*time.Millisecond))
session := New(ctx, id, fwm, fpm, frs, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond))
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(4)
var cids []cid.Cid
......
......@@ -9,6 +9,7 @@ import (
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
notifications "github.com/ipfs/go-bitswap/notifications"
bssession "github.com/ipfs/go-bitswap/session"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
peer "github.com/libp2p/go-libp2p-core/peer"
......@@ -28,7 +29,7 @@ type sesTrk struct {
}
// SessionFactory generates a new session for the SessionManager to track.
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter, provSearchDelay time.Duration, rebroadcastDelay delay.D) Session
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter, notif notifications.PubSub, provSearchDelay time.Duration, rebroadcastDelay delay.D) Session
// RequestSplitterFactory generates a new request splitter for a session.
type RequestSplitterFactory func(ctx context.Context) bssession.RequestSplitter
......@@ -43,6 +44,7 @@ type SessionManager struct {
sessionFactory SessionFactory
peerManagerFactory PeerManagerFactory
requestSplitterFactory RequestSplitterFactory
notif notifications.PubSub
// Sessions
sessLk sync.Mutex
......@@ -54,12 +56,14 @@ type SessionManager struct {
}
// New creates a new SessionManager.
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory, requestSplitterFactory RequestSplitterFactory) *SessionManager {
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory,
requestSplitterFactory RequestSplitterFactory, notif notifications.PubSub) *SessionManager {
return &SessionManager{
ctx: ctx,
sessionFactory: sessionFactory,
peerManagerFactory: peerManagerFactory,
requestSplitterFactory: requestSplitterFactory,
notif: notif,
}
}
......@@ -73,7 +77,7 @@ func (sm *SessionManager) NewSession(ctx context.Context,
pm := sm.peerManagerFactory(sessionctx, id)
srs := sm.requestSplitterFactory(sessionctx)
session := sm.sessionFactory(sessionctx, id, pm, srs, provSearchDelay, rebroadcastDelay)
session := sm.sessionFactory(sessionctx, id, pm, srs, sm.notif, provSearchDelay, rebroadcastDelay)
tracked := sesTrk{session, pm, srs}
sm.sessLk.Lock()
sm.sessions = append(sm.sessions, tracked)
......
......@@ -7,6 +7,7 @@ import (
delay "github.com/ipfs/go-ipfs-delay"
notifications "github.com/ipfs/go-bitswap/notifications"
bssession "github.com/ipfs/go-bitswap/session"
bssd "github.com/ipfs/go-bitswap/sessiondata"
"github.com/ipfs/go-bitswap/testutil"
......@@ -22,6 +23,7 @@ type fakeSession struct {
id uint64
pm *fakePeerManager
srs *fakeRequestSplitter
notif notifications.PubSub
}
func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
......@@ -67,6 +69,7 @@ func sessionFactory(ctx context.Context,
id uint64,
pm bssession.PeerManager,
srs bssession.RequestSplitter,
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) Session {
return &fakeSession{
......@@ -74,6 +77,7 @@ func sessionFactory(ctx context.Context,
id: id,
pm: pm.(*fakePeerManager),
srs: srs.(*fakeRequestSplitter),
notif: notif,
}
}
......@@ -111,7 +115,9 @@ func TestAddingSessions(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
notif := notifications.New()
defer notif.Shutdown()
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
......@@ -147,7 +153,9 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
notif := notifications.New()
defer notif.Shutdown()
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
p := peer.ID(123)
blks := testutil.GenerateBlocksOfSize(3, 1024)
......@@ -175,7 +183,9 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
notif := notifications.New()
defer notif.Shutdown()
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
......@@ -200,7 +210,9 @@ func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
notif := notifications.New()
defer notif.Shutdown()
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment