Commit 61599656 authored by Brian Tiger Chow's avatar Brian Tiger Chow

refactor(bitswap) consolidate HasBlock

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>

Conflicts:
	exchange/bitswap/bitswap.go
parent 00404873
......@@ -248,30 +248,19 @@ func (bs *bitswap) loop(parent context.Context) {
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
// TODO check all errors
log.Debugf("Has Block %s", blk.Key())
if err := bs.blockstore.Put(blk); err != nil {
return err
}
bs.wantlist.Remove(blk.Key())
bs.notifications.Publish(blk)
child, _ := context.WithTimeout(ctx, hasBlockTimeout)
bs.sendToPeersThatWant(child, blk)
if err := bs.sendToPeersThatWant(child, blk); err != nil {
return err
}
child, _ = context.WithTimeout(ctx, hasBlockTimeout)
return bs.routing.Provide(child, blk.Key())
}
// receiveBlock handles storing the block in the blockstore and calling HasBlock
func (bs *bitswap) receiveBlock(ctx context.Context, block *blocks.Block) {
// TODO verify blocks?
if err := bs.blockstore.Put(block); err != nil {
log.Criticalf("error putting block: %s", err)
return
}
err := bs.HasBlock(ctx, block)
if err != nil {
log.Warningf("HasBlock errored: %s", err)
}
}
// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) {
......@@ -297,7 +286,9 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
go func() {
for _, block := range incoming.Blocks() {
bs.receiveBlock(ctx, block)
if err := bs.HasBlock(ctx, block); err != nil {
log.Error(err)
}
}
}()
......@@ -334,27 +325,29 @@ func (bs *bitswap) ReceiveError(err error) {
// send strives to ensure that accounting is always performed when a message is
// sent
func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) {
bs.sender.SendMessage(ctx, p, m)
bs.strategy.MessageSent(p, m)
func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) error {
if err := bs.sender.SendMessage(ctx, p, m); err != nil {
return err
}
return bs.strategy.MessageSent(p, m)
}
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) {
log.Debugf("Sending %s to peers that want it", block)
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) error {
for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
log.Debugf("%v wants %v", p, block.Key())
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
message := bsmsg.New()
message.AddBlock(block)
for _, wanted := range bs.wantlist.Keys() {
message.AddWanted(wanted)
}
bs.send(ctx, p, message)
if err := bs.send(ctx, p, message); err != nil {
return err
}
}
}
}
return nil
}
func (bs *bitswap) Close() error {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment