Commit b5bd29a5 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #1290 from ipfs/feat/bitswap-speed

move findproviders out of critical path
parents 35fc7c15 2ec4c9ac
...@@ -82,7 +82,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, ...@@ -82,7 +82,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
notifications: notif, notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network, network: network,
batchRequests: make(chan *blockRequest, sizeBatchRequestChan), findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px, process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize), newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan u.Key), provideKeys: make(chan u.Key),
...@@ -115,10 +115,8 @@ type Bitswap struct { ...@@ -115,10 +115,8 @@ type Bitswap struct {
notifications notifications.PubSub notifications notifications.PubSub
// Requests for a set of related blocks // send keys to a worker to find and connect to providers for them
// the assumption is made that the same peer is likely to findKeys chan *blockRequest
// have more than a single block in the set
batchRequests chan *blockRequest
engine *decision.Engine engine *decision.Engine
...@@ -202,12 +200,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. ...@@ -202,12 +200,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
} }
promise := bs.notifications.Subscribe(ctx, keys...) promise := bs.notifications.Subscribe(ctx, keys...)
bs.wm.WantBlocks(keys)
req := &blockRequest{ req := &blockRequest{
keys: keys, keys: keys,
ctx: ctx, ctx: ctx,
} }
select { select {
case bs.batchRequests <- req: case bs.findKeys <- req:
return promise, nil return promise, nil
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
...@@ -270,39 +270,59 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -270,39 +270,59 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
// TODO: this is bad, and could be easily abused. // TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger // Should only track *useful* messages in ledger
if len(incoming.Blocks()) == 0 { iblocks := incoming.Blocks()
if len(iblocks) == 0 {
return return
} }
// quickly send out cancels, reduces chances of duplicate block receives // quickly send out cancels, reduces chances of duplicate block receives
var keys []u.Key var keys []u.Key
for _, block := range incoming.Blocks() { for _, block := range iblocks {
if _, found := bs.wm.wl.Contains(block.Key()); !found {
log.Notice("received un-asked-for block: %s", block)
continue
}
keys = append(keys, block.Key()) keys = append(keys, block.Key())
} }
bs.wm.CancelWants(keys) bs.wm.CancelWants(keys)
for _, block := range incoming.Blocks() { wg := sync.WaitGroup{}
for _, block := range iblocks {
wg.Add(1)
go func(b *blocks.Block) {
defer wg.Done()
bs.counterLk.Lock() bs.counterLk.Lock()
bs.blocksRecvd++ bs.blocksRecvd++
if has, err := bs.blockstore.Has(block.Key()); err == nil && has { has, err := bs.blockstore.Has(b.Key())
if err != nil {
bs.counterLk.Unlock()
log.Noticef("blockstore.Has error: %s", err)
return
}
if err == nil && has {
bs.dupBlocksRecvd++ bs.dupBlocksRecvd++
} }
brecvd := bs.blocksRecvd brecvd := bs.blocksRecvd
bdup := bs.dupBlocksRecvd bdup := bs.dupBlocksRecvd
bs.counterLk.Unlock() bs.counterLk.Unlock()
log.Infof("got block %s from %s (%d,%d)", block, p, brecvd, bdup) if has {
return
}
log.Debugf("got block %s from %s (%d,%d)", b, p, brecvd, bdup)
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, block); err != nil { if err := bs.HasBlock(hasBlockCtx, b); err != nil {
log.Warningf("ReceiveMessage HasBlock error: %s", err) log.Warningf("ReceiveMessage HasBlock error: %s", err)
} }
cancel() cancel()
}(block)
} }
wg.Wait()
} }
// Connected/Disconnected warns bitswap about peer connections // Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerConnected(p peer.ID) { func (bs *Bitswap) PeerConnected(p peer.ID) {
// TODO: add to clientWorker??
bs.wm.Connected(p) bs.wm.Connected(p)
} }
...@@ -313,7 +333,7 @@ func (bs *Bitswap) PeerDisconnected(p peer.ID) { ...@@ -313,7 +333,7 @@ func (bs *Bitswap) PeerDisconnected(p peer.ID) {
} }
func (bs *Bitswap) ReceiveError(err error) { func (bs *Bitswap) ReceiveError(err error) {
log.Debugf("Bitswap ReceiveError: %s", err) log.Infof("Bitswap ReceiveError: %s", err)
// TODO log the network error // TODO log the network error
// TODO bubble the network error up to the parent context/error logger // TODO bubble the network error up to the parent context/error logger
} }
......
...@@ -21,7 +21,7 @@ type WantManager struct { ...@@ -21,7 +21,7 @@ type WantManager struct {
// synchronized by Run loop, only touch inside there // synchronized by Run loop, only touch inside there
peers map[peer.ID]*msgQueue peers map[peer.ID]*msgQueue
wl *wantlist.Wantlist wl *wantlist.ThreadSafe
network bsnet.BitSwapNetwork network bsnet.BitSwapNetwork
ctx context.Context ctx context.Context
...@@ -33,7 +33,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana ...@@ -33,7 +33,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana
connect: make(chan peer.ID, 10), connect: make(chan peer.ID, 10),
disconnect: make(chan peer.ID, 10), disconnect: make(chan peer.ID, 10),
peers: make(map[peer.ID]*msgQueue), peers: make(map[peer.ID]*msgQueue),
wl: wantlist.New(), wl: wantlist.NewThreadSafe(),
network: network, network: network,
ctx: ctx, ctx: ctx,
} }
......
...@@ -31,7 +31,7 @@ func init() { ...@@ -31,7 +31,7 @@ func init() {
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// Start up a worker to handle block requests this node is making // Start up a worker to handle block requests this node is making
px.Go(func(px process.Process) { px.Go(func(px process.Process) {
bs.clientWorker(ctx) bs.providerConnector(ctx)
}) })
// Start up workers to handle requests from other nodes for the data on this node // Start up workers to handle requests from other nodes for the data on this node
...@@ -134,21 +134,19 @@ func (bs *Bitswap) provideCollector(ctx context.Context) { ...@@ -134,21 +134,19 @@ func (bs *Bitswap) provideCollector(ctx context.Context) {
} }
} }
// TODO ensure only one active request per key // connects to providers for the given keys
func (bs *Bitswap) clientWorker(parent context.Context) { func (bs *Bitswap) providerConnector(parent context.Context) {
defer log.Info("bitswap client worker shutting down...") defer log.Info("bitswap client worker shutting down...")
for { for {
select { select {
case req := <-bs.batchRequests: case req := <-bs.findKeys:
keys := req.keys keys := req.keys
if len(keys) == 0 { if len(keys) == 0 {
log.Warning("Received batch request for zero blocks") log.Warning("Received batch request for zero blocks")
continue continue
} }
bs.wm.WantBlocks(keys)
// NB: Optimization. Assumes that providers of key[0] are likely to // NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most // be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true. // every situation. Later, this assumption may not hold as true.
......
...@@ -75,7 +75,7 @@ func TestBitswapWithoutRouting(t *testing.T) { ...@@ -75,7 +75,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
} }
log.Debugf("%d %s get block.", i, n.Identity) log.Debugf("%d %s get block.", i, n.Identity)
b, err := n.Exchange.GetBlock(ctx, block0.Key()) b, err := n.Blocks.GetBlock(ctx, block0.Key())
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} else if !bytes.Equal(b.Data, block0.Data) { } else if !bytes.Equal(b.Data, block0.Data) {
...@@ -92,7 +92,7 @@ func TestBitswapWithoutRouting(t *testing.T) { ...@@ -92,7 +92,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
// get it out. // get it out.
for _, n := range nodes { for _, n := range nodes {
b, err := n.Exchange.GetBlock(ctx, block1.Key()) b, err := n.Blocks.GetBlock(ctx, block1.Key())
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} else if !bytes.Equal(b.Data, block1.Data) { } else if !bytes.Equal(b.Data, block1.Data) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment