Unverified Commit d1f829be authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #110 from ipfs/experiment/provider-system-no-blocks

Control provider workers with experiment flag
parents 07ec9e84 67856544
......@@ -43,6 +43,8 @@ const (
)
var (
ProvideEnabled = true
HasBlockBufferSize = 256
provideKeysBufferSize = 2048
provideWorkerMax = 6
......@@ -245,11 +247,13 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
bs.engine.AddBlock(blk)
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
if ProvideEnabled {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
return nil
}
......
......@@ -10,6 +10,7 @@ import (
decision "github.com/ipfs/go-bitswap/decision"
"github.com/ipfs/go-bitswap/message"
bssession "github.com/ipfs/go-bitswap/session"
tn "github.com/ipfs/go-bitswap/testnet"
blocks "github.com/ipfs/go-block-format"
......@@ -99,6 +100,42 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
}
}
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
ProvideEnabled = false
defer func() { ProvideEnabled = true }()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
g := NewTestSessionGenerator(net)
defer g.Close()
hasBlock := g.Next()
defer hasBlock.Exchange.Close()
if err := hasBlock.Exchange.HasBlock(block); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
wantsBlock := g.Next()
defer wantsBlock.Exchange.Close()
ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session)
// set find providers delay to less than timeout context of this test
ns.SetBaseTickDelay(10 * time.Millisecond)
received, err := ns.GetBlock(ctx, block.Cid())
if received != nil {
t.Fatalf("Expected to find nothing, found %s", received)
}
if err != context.DeadlineExceeded {
t.Fatal("Expected deadline exceeded")
}
}
func TestUnwantedBlockNotAdded(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
......
......@@ -23,15 +23,17 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
})
}
// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
})
// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
px.Go(bs.provideWorker)
if ProvideEnabled {
// Start up a worker to manage sending out provides messages
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
})
// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
px.Go(bs.provideWorker)
}
}
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment