Commit 22f0b879 authored by Jeromy's avatar Jeromy

fix panic in bitswap working limit spawning

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 9ca0be36
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
...@@ -74,11 +73,14 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { ...@@ -74,11 +73,14 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
func (bs *Bitswap) provideWorker(px process.Process) { func (bs *Bitswap) provideWorker(px process.Process) {
limiter := ratelimit.NewRateLimiter(px, provideWorkerMax) limit := make(chan struct{}, provideWorkerMax)
limitedGoProvide := func(k key.Key, wid int) { limitedGoProvide := func(k key.Key, wid int) {
defer func() {
// replace token when done
<-limit
}()
ev := logging.LoggableMap{"ID": wid} ev := logging.LoggableMap{"ID": wid}
limiter.LimitedGo(func(px process.Process) {
ctx := procctx.OnClosingContext(px) // derive ctx from px ctx := procctx.OnClosingContext(px) // derive ctx from px
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done() defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
...@@ -89,12 +91,10 @@ func (bs *Bitswap) provideWorker(px process.Process) { ...@@ -89,12 +91,10 @@ func (bs *Bitswap) provideWorker(px process.Process) {
if err := bs.network.Provide(ctx, k); err != nil { if err := bs.network.Provide(ctx, k); err != nil {
log.Error(err) log.Error(err)
} }
})
} }
// worker spawner, reads from bs.provideKeys until it closes, spawning a // worker spawner, reads from bs.provideKeys until it closes, spawning a
// _ratelimited_ number of workers to handle each key. // _ratelimited_ number of workers to handle each key.
limiter.Go(func(px process.Process) {
for wid := 2; ; wid++ { for wid := 2; ; wid++ {
ev := logging.LoggableMap{"ID": 1} ev := logging.LoggableMap{"ID": 1}
log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev) log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev)
...@@ -107,10 +107,14 @@ func (bs *Bitswap) provideWorker(px process.Process) { ...@@ -107,10 +107,14 @@ func (bs *Bitswap) provideWorker(px process.Process) {
log.Debug("provideKeys channel closed") log.Debug("provideKeys channel closed")
return return
} }
limitedGoProvide(k, wid) select {
case <-px.Closing():
return
case limit <- struct{}{}:
go limitedGoProvide(k, wid)
}
} }
} }
})
} }
func (bs *Bitswap) provideCollector(ctx context.Context) { func (bs *Bitswap) provideCollector(ctx context.Context) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment