Unverified Commit 47b99b1c authored by dirkmc's avatar dirkmc Committed by GitHub

feat: configurable engine blockstore worker count (#449)

parent 95cb1a00
......@@ -5,6 +5,7 @@ package bitswap
import (
......@@ -45,6 +46,9 @@ const (
// these requests take at _least_ two minutes at the moment.
provideTimeout = time.Minute * 3
defaultProvSearchDelay = time.Second
// Number of concurrent workers in decision engine that process requests to the blockstore
defaulEngineBlockstoreWorkerCount = 128
var (
......@@ -85,6 +89,17 @@ func RebroadcastDelay(newRebroadcastDelay delay.D) Option {
// EngineBlockstoreWorkerCount sets the number of worker threads used for
// blockstore operations in the decision engine
func EngineBlockstoreWorkerCount(count int) Option {
if count <= 0 {
panic(fmt.Sprintf("Engine blockstore worker count is %d but must be > 0", count))
return func(bs *Bitswap) {
bs.engineBstoreWorkerCount = count
// SetSendDontHaves indicates what to do when the engine receives a want-block
// for a block that is not in the blockstore. Either
// - Send a DONT_HAVE message
......@@ -99,7 +114,7 @@ func SetSendDontHaves(send bool) Option {
// Configures the engine to use the given score decision logic.
func WithScoreLedger(scoreLedger deciface.ScoreLedger) Option {
return func(bs *Bitswap) {
bs.engineScoreLedger = scoreLedger
......@@ -166,27 +181,26 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
notif := notifications.New()
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
engine := decision.NewEngine(ctx, bstore, network.ConnectionManager(), network.Self())
bs := &Bitswap{
blockstore: bstore,
engine: engine,
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
pm: pm,
pqm: pqm,
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
provSearchDelay: defaultProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
provSearchDelay: defaultProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
engineBstoreWorkerCount: defaulEngineBlockstoreWorkerCount,
// apply functional options before starting and running bitswap
......@@ -194,12 +208,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
// Set up decision engine
bs.engine = decision.NewEngine(bstore, bs.engineBstoreWorkerCount, network.ConnectionManager(), network.Self(), bs.engineScoreLedger)
// Start up bitswaps async worker routines
bs.startWorkers(ctx, px)
engine.StartWorkers(ctx, px)
bs.engine.StartWorkers(ctx, px)
// bind the context and process.
// do it over here to avoid closing before all setup is done.
......@@ -270,6 +287,12 @@ type Bitswap struct {
// how often to rebroadcast providing requests to find more optimized providers
rebroadcastDelay delay.D
// how many worker threads to start for decision engine blockstore worker
engineBstoreWorkerCount int
// the score ledger used by the decision engine
engineScoreLedger deciface.ScoreLedger
type counters struct {
......@@ -21,7 +21,7 @@ type blockstoreManager struct {
// newBlockstoreManager creates a new blockstoreManager with the given context
// and number of workers
func newBlockstoreManager(ctx context.Context, bs bstore.Blockstore, workerCount int) *blockstoreManager {
func newBlockstoreManager(bs bstore.Blockstore, workerCount int) *blockstoreManager {
return &blockstoreManager{
bs: bs,
workerCount: workerCount,
......@@ -25,7 +25,7 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(ctx, bstore, 5)
bsm := newBlockstoreManager(bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))
cids := testutil.GenerateCids(4)
......@@ -64,7 +64,7 @@ func TestBlockstoreManager(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(ctx, bstore, 5)
bsm := newBlockstoreManager(bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))
exp := make(map[cid.Cid]blocks.Block)
......@@ -148,7 +148,7 @@ func TestBlockstoreManagerConcurrency(t *testing.T) {
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
workerCount := 5
bsm := newBlockstoreManager(ctx, bstore, workerCount)
bsm := newBlockstoreManager(bstore, workerCount)
bsm.start(process.WithTeardown(func() error { return nil }))
blkSize := int64(8 * 1024)
......@@ -190,7 +190,7 @@ func TestBlockstoreManagerClose(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(ctx, bstore, 3)
bsm := newBlockstoreManager(bstore, 3)
px := process.WithTeardown(func() error { return nil })
......@@ -227,7 +227,7 @@ func TestBlockstoreManagerCtxDone(t *testing.T) {
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
bsm := newBlockstoreManager(context.Background(), bstore, 3)
bsm := newBlockstoreManager(bstore, 3)
proc := process.WithTeardown(func() error { return nil })
......@@ -76,9 +76,6 @@ const (
// Number of concurrent workers that pull tasks off the request queue
taskWorkerCount = 8
// Number of concurrent workers that process requests to the blockstore
blockstoreWorkerCount = 128
// Envelope contains a message for a Peer.
......@@ -166,16 +163,16 @@ type Engine struct {
sendDontHaves bool
self peer.ID
self peer.ID
// NewEngine creates a new block sending engine for the given block store
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine {
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, nil)
func NewEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagger, self peer.ID, scoreLedger ScoreLedger) *Engine {
return newEngine(bs, bstoreWorkerCount, peerTagger, self, maxBlockSizeReplaceHasWithBlock, scoreLedger)
// This constructor is used by the tests
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID,
func newEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagger, self peer.ID,
maxReplaceSize int, scoreLedger ScoreLedger) *Engine {
if scoreLedger == nil {
......@@ -185,7 +182,7 @@ func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger,
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
scoreLedger: scoreLedger,
bsm: newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
bsm: newBlockstoreManager(bs, bstoreWorkerCount),
peerTagger: peerTagger,
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
......@@ -215,12 +212,6 @@ func (e *Engine) SetSendDontHaves(send bool) {
e.sendDontHaves = send
// Sets the scoreLedger to the given implementation. Should be called
// before StartWorkers().
func (e *Engine) UseScoreLedger(scoreLedger ScoreLedger) {
e.scoreLedger = scoreLedger
// Starts the score ledger. Before start the function checks and,
// if it is unset, initializes the scoreLedger with the default
// implementation.
......@@ -97,7 +97,7 @@ func newTestEngine(ctx context.Context, idStr string) engineSet {
func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}) engineSet {
fpt := &fakePeerTagger{}
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := newEngine(ctx, bs, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh))
e := newEngine(bs, 4, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
return engineSet{
Peer: peer.ID(idStr),
......@@ -185,7 +185,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool {
func TestOutboxClosedWhenEngineClosed(t *testing.T) {
ctx := context.Background()
t.SkipNow() // TODO implement *Engine.Close
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
var wg sync.WaitGroup
......@@ -513,7 +513,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) {
testCases = onlyTestCases
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
for i, testCase := range testCases {
t.Logf("Test case %d:", i)
......@@ -669,7 +669,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) {
testCases = onlyTestCases
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
var next envChan
......@@ -854,7 +854,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
ctx := context.Background()
for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases))
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
for _, testcase := range testcases {
set := testcase[0]
......@@ -879,7 +879,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
blks := testutil.GenerateBlocksOfSize(4, 8*1024)
......@@ -923,7 +923,7 @@ func TestSendDontHave(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
blks := testutil.GenerateBlocksOfSize(4, 8*1024)
......@@ -987,7 +987,7 @@ func TestWantlistForPeer(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
blks := testutil.GenerateBlocksOfSize(4, 8*1024)
