Commit 88853d99 authored by Jeromy's avatar Jeromy

add worker to bitswap for reproviding new blocks

parent 6896d8f0
...@@ -8,7 +8,6 @@ import ( ...@@ -8,7 +8,6 @@ import (
"time" "time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
...@@ -37,9 +36,13 @@ const ( ...@@ -37,9 +36,13 @@ const (
maxProvidersPerRequest = 3 maxProvidersPerRequest = 3
providerRequestTimeout = time.Second * 10 providerRequestTimeout = time.Second * 10
hasBlockTimeout = time.Second * 15 hasBlockTimeout = time.Second * 15
provideTimeout = time.Second * 15
sizeBatchRequestChan = 32 sizeBatchRequestChan = 32
// kMaxPriority is the max priority as defined by the bitswap protocol // kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32 kMaxPriority = math.MaxInt32
hasBlockBufferSize = 256
provideWorkers = 4
) )
var ( var (
...@@ -86,18 +89,12 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, ...@@ -86,18 +89,12 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
wantlist: wantlist.NewThreadSafe(), wantlist: wantlist.NewThreadSafe(),
batchRequests: make(chan *blockRequest, sizeBatchRequestChan), batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
process: px, process: px,
newBlocks: make(chan *blocks.Block, hasBlockBufferSize),
} }
network.SetDelegate(bs) network.SetDelegate(bs)
px.Go(func(px process.Process) {
bs.clientWorker(ctx)
})
px.Go(func(px process.Process) {
bs.taskWorker(ctx)
})
px.Go(func(px process.Process) {
bs.rebroadcastWorker(ctx)
})
// Start up bitswaps async worker routines
bs.startWorkers(px, ctx)
return bs return bs
} }
...@@ -126,6 +123,8 @@ type bitswap struct { ...@@ -126,6 +123,8 @@ type bitswap struct {
wantlist *wantlist.ThreadSafe wantlist *wantlist.ThreadSafe
process process.Process process process.Process
newBlocks chan *blocks.Block
} }
type blockRequest struct { type blockRequest struct {
...@@ -172,7 +171,6 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ...@@ -172,7 +171,6 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
case <-parent.Done(): case <-parent.Done():
return nil, parent.Err() return nil, parent.Err()
} }
} }
// GetBlocks returns a channel where the caller may receive blocks that // GetBlocks returns a channel where the caller may receive blocks that
...@@ -205,6 +203,7 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. ...@@ -205,6 +203,7 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
// HasBlock announces the existance of a block to this bitswap service. The // HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers. // service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
log.Event(ctx, "hasBlock", blk)
select { select {
case <-bs.process.Closing(): case <-bs.process.Closing():
return errors.New("bitswap is closed") return errors.New("bitswap is closed")
...@@ -215,7 +214,12 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { ...@@ -215,7 +214,12 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
} }
bs.wantlist.Remove(blk.Key()) bs.wantlist.Remove(blk.Key())
bs.notifications.Publish(blk) bs.notifications.Publish(blk)
return bs.network.Provide(ctx, blk.Key()) select {
case bs.newBlocks <- blk:
case <-ctx.Done():
return ctx.Err()
}
return nil
} }
func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
...@@ -310,6 +314,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -310,6 +314,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
log.Debug(err) log.Debug(err)
} }
} }
var keys []u.Key var keys []u.Key
for _, block := range incoming.Blocks() { for _, block := range incoming.Blocks() {
keys = append(keys, block.Key()) keys = append(keys, block.Key())
...@@ -391,82 +396,3 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) ...@@ -391,82 +396,3 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage)
func (bs *bitswap) Close() error { func (bs *bitswap) Close() error {
return bs.process.Close() return bs.process.Close()
} }
func (bs *bitswap) taskWorker(ctx context.Context) {
defer log.Info("bitswap task worker shutting down...")
for {
select {
case <-ctx.Done():
return
case nextEnvelope := <-bs.engine.Outbox():
select {
case <-ctx.Done():
return
case envelope, ok := <-nextEnvelope:
if !ok {
continue
}
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
}
}
}
}
// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")
for {
select {
case req := <-bs.batchRequests:
keys := req.keys
if len(keys) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
for i, k := range keys {
bs.wantlist.Add(k, kMaxPriority-i)
}
bs.wantNewBlocks(req.ctx, keys)
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, _ := context.WithTimeout(req.ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(req.ctx, providers)
if err != nil {
log.Debugf("error sending wantlist: %s", err)
}
case <-parent.Done():
return
}
}
}
func (bs *bitswap) rebroadcastWorker(parent context.Context) {
ctx, cancel := context.WithCancel(parent)
defer cancel()
broadcastSignal := time.After(rebroadcastDelay.Get())
for {
select {
case <-time.Tick(10 * time.Second):
n := bs.wantlist.Len()
if n > 0 {
log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
}
case <-broadcastSignal: // resend unfulfilled wantlist keys
entries := bs.wantlist.Entries()
if len(entries) > 0 {
bs.sendWantlistToProviders(ctx, entries)
}
broadcastSignal = time.After(rebroadcastDelay.Get())
case <-parent.Done():
return
}
}
}
package bitswap
import (
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) {
// Start up a worker to handle block requests this node is making
px.Go(func(px process.Process) {
bs.clientWorker(ctx)
})
// Start up a worker to handle requests from other nodes for the data on this node
px.Go(func(px process.Process) {
bs.taskWorker(ctx)
})
// Start up a worker to manage periodically resending our wantlist out to peers
px.Go(func(px process.Process) {
bs.rebroadcastWorker(ctx)
})
// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
for i := 0; i < provideWorkers; i++ {
px.Go(func(px process.Process) {
bs.blockReceiveWorker(ctx)
})
}
}
func (bs *bitswap) taskWorker(ctx context.Context) {
defer log.Info("bitswap task worker shutting down...")
for {
select {
case nextEnvelope := <-bs.engine.Outbox():
select {
case envelope, ok := <-nextEnvelope:
if !ok {
continue
}
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
func (bs *bitswap) blockReceiveWorker(ctx context.Context) {
for {
select {
case blk, ok := <-bs.newBlocks:
if !ok {
log.Debug("newBlocks channel closed")
return
}
ctx, _ := context.WithTimeout(ctx, provideTimeout)
err := bs.network.Provide(ctx, blk.Key())
if err != nil {
log.Error(err)
}
case <-ctx.Done():
return
}
}
}
// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")
for {
select {
case req := <-bs.batchRequests:
keys := req.keys
if len(keys) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
for i, k := range keys {
bs.wantlist.Add(k, kMaxPriority-i)
}
bs.wantNewBlocks(req.ctx, keys)
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, _ := context.WithTimeout(req.ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(req.ctx, providers)
if err != nil {
log.Debugf("error sending wantlist: %s", err)
}
case <-parent.Done():
return
}
}
}
func (bs *bitswap) rebroadcastWorker(parent context.Context) {
ctx, cancel := context.WithCancel(parent)
defer cancel()
broadcastSignal := time.After(rebroadcastDelay.Get())
for {
select {
case <-time.Tick(10 * time.Second):
n := bs.wantlist.Len()
if n > 0 {
log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
}
case <-broadcastSignal: // resend unfulfilled wantlist keys
entries := bs.wantlist.Entries()
if len(entries) > 0 {
bs.sendWantlistToProviders(ctx, entries)
}
broadcastSignal = time.After(rebroadcastDelay.Get())
case <-parent.Done():
return
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment