Commit 6e6c6638 authored by Jeromy's avatar Jeromy

implement a worker to consolidate HasBlock provide calls into one to alieviate memory pressure

parent 309229da
......@@ -89,6 +89,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan u.Key),
}
network.SetDelegate(bs)
......@@ -124,6 +125,8 @@ type Bitswap struct {
process process.Process
newBlocks chan *blocks.Block
provideKeys chan u.Key
}
type blockRequest struct {
......
......@@ -6,6 +6,7 @@ import (
inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/chuckpreslar/inflect"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
u "github.com/jbenet/go-ipfs/util"
)
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
......@@ -24,6 +25,10 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
bs.rebroadcastWorker(ctx)
})
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
})
// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
......@@ -58,13 +63,13 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
func (bs *Bitswap) provideWorker(ctx context.Context) {
for {
select {
case blk, ok := <-bs.newBlocks:
case k, ok := <-bs.provideKeys:
if !ok {
log.Debug("newBlocks channel closed")
log.Debug("provideKeys channel closed")
return
}
ctx, _ := context.WithTimeout(ctx, provideTimeout)
err := bs.network.Provide(ctx, blk.Key())
err := bs.network.Provide(ctx, k)
if err != nil {
log.Error(err)
}
......@@ -74,6 +79,51 @@ func (bs *Bitswap) provideWorker(ctx context.Context) {
}
}
func (bs *Bitswap) provideCollector(ctx context.Context) {
defer close(bs.provideKeys)
var toprovide []u.Key
var nextKey u.Key
select {
case blk, ok := <-bs.newBlocks:
if !ok {
log.Debug("newBlocks channel closed")
return
}
nextKey = blk.Key()
case <-ctx.Done():
return
}
for {
select {
case blk, ok := <-bs.newBlocks:
if !ok {
log.Debug("newBlocks channel closed")
return
}
toprovide = append(toprovide, blk.Key())
case bs.provideKeys <- nextKey:
if len(toprovide) > 0 {
nextKey = toprovide[0]
toprovide = toprovide[1:]
} else {
select {
case blk, ok := <-bs.newBlocks:
if !ok {
return
}
nextKey = blk.Key()
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}
// TODO ensure only one active request per key
func (bs *Bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment