Commit 06b49918 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

bitswap/provide: improved rate limiting

this PR greatly speeds up providing and add.

(1) Instead of idling workers, we move to a ratelimiter-based worker.
We put this max at 512, so that means _up to_ 512 goroutines. This
is very small load on the node, as each worker is providing to the
dht, which means mostly waiting. It DOES put a large load on the DHT.
but i want to try this out for a while and see if it's a problem.
We can decide later if it is a problem for the network (nothing
stops anyone from re-compiling, but the defaults of course matter).

(2) We add a buffer size for provideKeys, which means that we block
the add process much less. this is a very cheap buffer, as it only
stores keys (it may be even cheaper with a lock + ring buffer
instead of a channel...). This makes add blazing fast-- it was being
rate limited by providing. Add should not be ratelimited by providing
(much, if any) as the user wants to just store the stuff in the local
node's repo. This buffer is initially set to 4096, which means:

  4096 * keysize (~258 bytes + go overhead) ~ 1-1.5MB

this buffer only last a few sec to mins, and is an ok thing to do
for the sake of very fast adds. (this could be a configurable
paramter, certainly for low-mem footprint use cases). At the moment
this is not much, compared to block sizes.

(3) We make the providing EventBegin() + Done(), so that we can
track how long a provide takes, and we can remove workers as they
finish in bsdash and similar tools.

License: MIT
Signed-off-by: default avatarJuan Batiz-Benet <juan@benet.ai>
parent 078db5de
...@@ -39,8 +39,9 @@ const ( ...@@ -39,8 +39,9 @@ const (
// kMaxPriority is the max priority as defined by the bitswap protocol // kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32 kMaxPriority = math.MaxInt32
HasBlockBufferSize = 256 HasBlockBufferSize = 256
provideWorkers = 4 provideKeysBufferSize = 2048
provideWorkerMax = 512
) )
var rebroadcastDelay = delay.Fixed(time.Second * 10) var rebroadcastDelay = delay.Fixed(time.Second * 10)
...@@ -85,7 +86,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, ...@@ -85,7 +86,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
findKeys: make(chan *blockRequest, sizeBatchRequestChan), findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px, process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize), newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan key.Key), provideKeys: make(chan key.Key, provideKeysBufferSize),
wm: NewWantManager(ctx, network), wm: NewWantManager(ctx, network),
} }
go bs.wm.Run() go bs.wm.Run()
......
package bitswap package bitswap
import ( import (
"os"
"strconv"
"time" "time"
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
waitable "github.com/ipfs/go-ipfs/thirdparty/waitable"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
...@@ -14,22 +14,6 @@ import ( ...@@ -14,22 +14,6 @@ import (
var TaskWorkerCount = 8 var TaskWorkerCount = 8
func init() {
twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS")
if twc != "" {
n, err := strconv.Atoi(twc)
if err != nil {
log.Error(err)
return
}
if n > 0 {
TaskWorkerCount = n
} else {
log.Errorf("Invalid value of '%d' for IPFS_BITSWAP_TASK_WORKERS", n)
}
}
}
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// Start up a worker to handle block requests this node is making // Start up a worker to handle block requests this node is making
px.Go(func(px process.Process) { px.Go(func(px process.Process) {
...@@ -57,12 +41,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { ...@@ -57,12 +41,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// Spawn up multiple workers to handle incoming blocks // Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks // consider increasing number if providing blocks bottlenecks
// file transfers // file transfers
for i := 0; i < provideWorkers; i++ { px.Go(bs.provideWorker)
i := i
px.Go(func(px process.Process) {
bs.provideWorker(ctx, i)
})
}
} }
func (bs *Bitswap) taskWorker(ctx context.Context, id int) { func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
...@@ -77,7 +56,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { ...@@ -77,7 +56,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
if !ok { if !ok {
continue continue
} }
log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{"ID": id, "Target": envelope.Peer.Pretty(), "Block": envelope.Block.Multihash.B58String()}) log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{
"ID": id,
"Target": envelope.Peer.Pretty(),
"Block": envelope.Block.Multihash.B58String(),
})
bs.wm.SendBlock(ctx, envelope) bs.wm.SendBlock(ctx, envelope)
case <-ctx.Done(): case <-ctx.Done():
...@@ -89,27 +72,45 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { ...@@ -89,27 +72,45 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
} }
} }
func (bs *Bitswap) provideWorker(ctx context.Context, id int) { func (bs *Bitswap) provideWorker(px process.Process) {
idmap := eventlog.LoggableMap{"ID": id}
for { limiter := ratelimit.NewRateLimiter(px, provideWorkerMax)
log.Event(ctx, "Bitswap.ProvideWorker.Loop", idmap)
select { limitedGoProvide := func(k key.Key, wid int) {
case k, ok := <-bs.provideKeys: ev := eventlog.LoggableMap{"ID": wid}
log.Event(ctx, "Bitswap.ProvideWorker.Work", idmap, &k) limiter.LimitedGo(func(px process.Process) {
if !ok {
log.Debug("provideKeys channel closed") ctx := waitable.Context(px) // derive ctx from px
return defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
}
ctx, cancel := context.WithTimeout(ctx, provideTimeout) ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
err := bs.network.Provide(ctx, k) defer cancel()
if err != nil {
if err := bs.network.Provide(ctx, k); err != nil {
log.Error(err) log.Error(err)
} }
cancel() })
case <-ctx.Done():
return
}
} }
// worker spawner, reads from bs.provideKeys until it closes, spawning a
// _ratelimited_ number of workers to handle each key.
limiter.Go(func(px process.Process) {
for wid := 2; ; wid++ {
ev := eventlog.LoggableMap{"ID": 1}
log.Event(waitable.Context(px), "Bitswap.ProvideWorker.Loop", ev)
select {
case <-px.Closing():
return
case k, ok := <-bs.provideKeys:
if !ok {
log.Debug("provideKeys channel closed")
return
}
limitedGoProvide(k, wid)
}
}
})
} }
func (bs *Bitswap) provideCollector(ctx context.Context) { func (bs *Bitswap) provideCollector(ctx context.Context) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment