Commit 5c5b77bb authored by Jeromy's avatar Jeromy

allow bitswap to attempt to write blocks to disk multiple times

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 75237256
...@@ -228,7 +228,9 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { ...@@ -228,7 +228,9 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
default: default:
} }
if err := bs.blockstore.Put(blk); err != nil { err := bs.tryPutBlock(blk, 4) // attempt to store block up to four times
if err != nil {
log.Errorf("Error writing block to datastore: %s", err)
return err return err
} }
...@@ -242,6 +244,18 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { ...@@ -242,6 +244,18 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return nil return nil
} }
func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error {
var err error
for i := 0; i < attempts; i++ {
if err = bs.blockstore.Put(blk); err == nil {
break
}
time.Sleep(time.Millisecond * time.Duration(400*(i+1)))
}
return err
}
func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) { func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
...@@ -297,38 +311,46 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -297,38 +311,46 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
wg.Add(1) wg.Add(1)
go func(b *blocks.Block) { go func(b *blocks.Block) {
defer wg.Done() defer wg.Done()
bs.counterLk.Lock()
bs.blocksRecvd++ if err := bs.updateReceiveCounters(b.Key()); err != nil {
has, err := bs.blockstore.Has(b.Key()) return // ignore error, is either logged previously, or ErrAlreadyHaveBlock
if err != nil {
bs.counterLk.Unlock()
log.Infof("blockstore.Has error: %s", err)
return
}
if err == nil && has {
bs.dupBlocksRecvd++
}
brecvd := bs.blocksRecvd
bdup := bs.dupBlocksRecvd
bs.counterLk.Unlock()
if has {
return
} }
k := b.Key() k := b.Key()
log.Event(ctx, "Bitswap.GetBlockRequest.End", &k) log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
log.Debugf("got block %s from %s (%d,%d)", b, p, brecvd, bdup) log.Debugf("got block %s from %s", b, p)
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
defer cancel()
if err := bs.HasBlock(hasBlockCtx, b); err != nil { if err := bs.HasBlock(hasBlockCtx, b); err != nil {
log.Warningf("ReceiveMessage HasBlock error: %s", err) log.Warningf("ReceiveMessage HasBlock error: %s", err)
} }
cancel()
}(block) }(block)
} }
wg.Wait() wg.Wait()
} }
var ErrAlreadyHaveBlock = errors.New("already have block")
func (bs *Bitswap) updateReceiveCounters(k key.Key) error {
bs.counterLk.Lock()
defer bs.counterLk.Unlock()
bs.blocksRecvd++
has, err := bs.blockstore.Has(k)
if err != nil {
log.Infof("blockstore.Has error: %s", err)
return err
}
if err == nil && has {
bs.dupBlocksRecvd++
}
if has {
return ErrAlreadyHaveBlock
}
return nil
}
// Connected/Disconnected warns bitswap about peer connections // Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerConnected(p peer.ID) { func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.wm.Connected(p) bs.wm.Connected(p)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment