diff --git a/bitswap.go b/bitswap.go index e87157573418279d6e243c5e785b99e518259d0e..0297c0989dd4c6afcdd6513b80423f1a204d1e11 100644 --- a/bitswap.go +++ b/bitswap.go @@ -5,6 +5,7 @@ package bitswap import ( "context" "errors" + "fmt" "sync" "time" @@ -45,6 +46,9 @@ const ( // these requests take at _least_ two minutes at the moment. provideTimeout = time.Minute * 3 defaultProvSearchDelay = time.Second + + // Number of concurrent workers in decision engine that process requests to the blockstore + defaulEngineBlockstoreWorkerCount = 128 ) var ( @@ -85,6 +89,17 @@ func RebroadcastDelay(newRebroadcastDelay delay.D) Option { } } +// EngineBlockstoreWorkerCount sets the number of worker threads used for +// blockstore operations in the decision engine +func EngineBlockstoreWorkerCount(count int) Option { + if count <= 0 { + panic(fmt.Sprintf("Engine blockstore worker count is %d but must be > 0", count)) + } + return func(bs *Bitswap) { + bs.engineBstoreWorkerCount = count + } +} + // SetSendDontHaves indicates what to do when the engine receives a want-block // for a block that is not in the blockstore. Either // - Send a DONT_HAVE message @@ -99,7 +114,7 @@ func SetSendDontHaves(send bool) Option { // Configures the engine to use the given score decision logic. func WithScoreLedger(scoreLedger deciface.ScoreLedger) Option { return func(bs *Bitswap) { - bs.engine.UseScoreLedger(scoreLedger) + bs.engineScoreLedger = scoreLedger } } @@ -166,27 +181,26 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, } notif := notifications.New() sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self()) - engine := decision.NewEngine(ctx, bstore, network.ConnectionManager(), network.Self()) bs := &Bitswap{ blockstore: bstore, - engine: engine, network: network, process: px, newBlocks: make(chan cid.Cid, HasBlockBufferSize), provideKeys: make(chan cid.Cid, provideKeysBufferSize), pm: pm, pqm: pqm, - sm: sm, - sim: sim, - notif: notif, - counters: new(counters), - dupMetric: dupHist, - allMetric: allHist, - sentHistogram: sentHistogram, - provideEnabled: true, - provSearchDelay: defaultProvSearchDelay, - rebroadcastDelay: delay.Fixed(time.Minute), + sm: sm, + sim: sim, + notif: notif, + counters: new(counters), + dupMetric: dupHist, + allMetric: allHist, + sentHistogram: sentHistogram, + provideEnabled: true, + provSearchDelay: defaultProvSearchDelay, + rebroadcastDelay: delay.Fixed(time.Minute), + engineBstoreWorkerCount: defaulEngineBlockstoreWorkerCount, } // apply functional options before starting and running bitswap @@ -194,12 +208,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, option(bs) } + // Set up decision engine + bs.engine = decision.NewEngine(bstore, bs.engineBstoreWorkerCount, network.ConnectionManager(), network.Self(), bs.engineScoreLedger) + bs.pqm.Startup() network.SetDelegate(bs) // Start up bitswaps async worker routines bs.startWorkers(ctx, px) - engine.StartWorkers(ctx, px) + bs.engine.StartWorkers(ctx, px) // bind the context and process. // do it over here to avoid closing before all setup is done. @@ -270,6 +287,12 @@ type Bitswap struct { // how often to rebroadcast providing requests to find more optimized providers rebroadcastDelay delay.D + + // how many worker threads to start for decision engine blockstore worker + engineBstoreWorkerCount int + + // the score ledger used by the decision engine + engineScoreLedger deciface.ScoreLedger } type counters struct { diff --git a/internal/decision/blockstoremanager.go b/internal/decision/blockstoremanager.go index 8d880a6c483f14e79730ea7dec86972cc90a754d..1cc09dffcf9d878979c02fd38acb59a2071e02e0 100644 --- a/internal/decision/blockstoremanager.go +++ b/internal/decision/blockstoremanager.go @@ -21,7 +21,7 @@ type blockstoreManager struct { // newBlockstoreManager creates a new blockstoreManager with the given context // and number of workers -func newBlockstoreManager(ctx context.Context, bs bstore.Blockstore, workerCount int) *blockstoreManager { +func newBlockstoreManager(bs bstore.Blockstore, workerCount int) *blockstoreManager { return &blockstoreManager{ bs: bs, workerCount: workerCount, diff --git a/internal/decision/blockstoremanager_test.go b/internal/decision/blockstoremanager_test.go index cac0a5b0ebb372815daaf1d3cdede89cfc20ed7d..49a10c50cdeb92e54fa5b8e8e76149af66b07fd5 100644 --- a/internal/decision/blockstoremanager_test.go +++ b/internal/decision/blockstoremanager_test.go @@ -25,7 +25,7 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) { dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)) - bsm := newBlockstoreManager(ctx, bstore, 5) + bsm := newBlockstoreManager(bstore, 5) bsm.start(process.WithTeardown(func() error { return nil })) cids := testutil.GenerateCids(4) @@ -64,7 +64,7 @@ func TestBlockstoreManager(t *testing.T) { dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)) - bsm := newBlockstoreManager(ctx, bstore, 5) + bsm := newBlockstoreManager(bstore, 5) bsm.start(process.WithTeardown(func() error { return nil })) exp := make(map[cid.Cid]blocks.Block) @@ -148,7 +148,7 @@ func TestBlockstoreManagerConcurrency(t *testing.T) { bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)) workerCount := 5 - bsm := newBlockstoreManager(ctx, bstore, workerCount) + bsm := newBlockstoreManager(bstore, workerCount) bsm.start(process.WithTeardown(func() error { return nil })) blkSize := int64(8 * 1024) @@ -190,7 +190,7 @@ func TestBlockstoreManagerClose(t *testing.T) { dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)) - bsm := newBlockstoreManager(ctx, bstore, 3) + bsm := newBlockstoreManager(bstore, 3) px := process.WithTeardown(func() error { return nil }) bsm.start(px) @@ -227,7 +227,7 @@ func TestBlockstoreManagerCtxDone(t *testing.T) { dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)) - bsm := newBlockstoreManager(context.Background(), bstore, 3) + bsm := newBlockstoreManager(bstore, 3) proc := process.WithTeardown(func() error { return nil }) bsm.start(proc) diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 62957d611d83ae5177e815368c53a31062f08a1c..6e69ca6578dc01151afc9035f7af17b9ae8a6ab6 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -76,9 +76,6 @@ const ( // Number of concurrent workers that pull tasks off the request queue taskWorkerCount = 8 - - // Number of concurrent workers that process requests to the blockstore - blockstoreWorkerCount = 128 ) // Envelope contains a message for a Peer. @@ -166,16 +163,16 @@ type Engine struct { sendDontHaves bool - self peer.ID + self peer.ID } // NewEngine creates a new block sending engine for the given block store -func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine { - return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, nil) +func NewEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagger, self peer.ID, scoreLedger ScoreLedger) *Engine { + return newEngine(bs, bstoreWorkerCount, peerTagger, self, maxBlockSizeReplaceHasWithBlock, scoreLedger) } // This constructor is used by the tests -func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, +func newEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagger, self peer.ID, maxReplaceSize int, scoreLedger ScoreLedger) *Engine { if scoreLedger == nil { @@ -185,7 +182,7 @@ func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, e := &Engine{ ledgerMap: make(map[peer.ID]*ledger), scoreLedger: scoreLedger, - bsm: newBlockstoreManager(ctx, bs, blockstoreWorkerCount), + bsm: newBlockstoreManager(bs, bstoreWorkerCount), peerTagger: peerTagger, outbox: make(chan (<-chan *Envelope), outboxChanBuffer), workSignal: make(chan struct{}, 1), @@ -215,12 +212,6 @@ func (e *Engine) SetSendDontHaves(send bool) { e.sendDontHaves = send } -// Sets the scoreLedger to the given implementation. Should be called -// before StartWorkers(). -func (e *Engine) UseScoreLedger(scoreLedger ScoreLedger) { - e.scoreLedger = scoreLedger -} - // Starts the score ledger. Before start the function checks and, // if it is unset, initializes the scoreLedger with the default // implementation. diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index 3046dc0d1d5ecb28a15390816172f08c8dcad974..b4f3d068e0b219dce620dd5f7c53c3b1378c8727 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -97,7 +97,7 @@ func newTestEngine(ctx context.Context, idStr string) engineSet { func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}) engineSet { fpt := &fakePeerTagger{} bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - e := newEngine(ctx, bs, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh)) + e := newEngine(bs, 4, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh)) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) return engineSet{ Peer: peer.ID(idStr), @@ -185,7 +185,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool { func TestOutboxClosedWhenEngineClosed(t *testing.T) { ctx := context.Background() t.SkipNow() // TODO implement *Engine.Close - e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) + e := newEngine(blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) var wg sync.WaitGroup wg.Add(1) @@ -513,7 +513,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) { testCases = onlyTestCases } - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) + e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) for i, testCase := range testCases { t.Logf("Test case %d:", i) @@ -669,7 +669,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) { testCases = onlyTestCases } - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) + e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) var next envChan @@ -854,7 +854,7 @@ func TestPartnerWantsThenCancels(t *testing.T) { ctx := context.Background() for i := 0; i < numRounds; i++ { expected := make([][]string, 0, len(testcases)) - e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) + e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) for _, testcase := range testcases { set := testcase[0] @@ -879,7 +879,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) + e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) blks := testutil.GenerateBlocksOfSize(4, 8*1024) @@ -923,7 +923,7 @@ func TestSendDontHave(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) + e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) blks := testutil.GenerateBlocksOfSize(4, 8*1024) @@ -987,7 +987,7 @@ func TestWantlistForPeer(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) + e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) blks := testutil.GenerateBlocksOfSize(4, 8*1024)