Commit 0abce33f authored by Brian Tiger Chow's avatar Brian Tiger Chow

refac(exch:bitswap) always notify strategy when message sent

parent 7de1c505
...@@ -79,6 +79,9 @@ type bitswap struct { ...@@ -79,6 +79,9 @@ type bitswap struct {
// GetBlock attempts to retrieve a particular block from peers, within timeout. // GetBlock attempts to retrieve a particular block from peers, within timeout.
func (bs *bitswap) Block(k u.Key, timeout time.Duration) ( func (bs *bitswap) Block(k u.Key, timeout time.Duration) (
*blocks.Block, error) { *blocks.Block, error) {
ctx, _ := context.WithTimeout(context.Background(), timeout)
// TODO replace timeout with ctx in routing interface
begin := time.Now() begin := time.Now()
tleft := timeout - time.Now().Sub(begin) tleft := timeout - time.Now().Sub(begin)
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout) provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
...@@ -90,7 +93,7 @@ func (bs *bitswap) Block(k u.Key, timeout time.Duration) ( ...@@ -90,7 +93,7 @@ func (bs *bitswap) Block(k u.Key, timeout time.Duration) (
go func() { go func() {
for p := range provs_ch { for p := range provs_ch {
go func(pr *peer.Peer) { go func(pr *peer.Peer) {
blk, err := bs.getBlock(k, pr, tleft) blk, err := bs.getBlock(ctx, k, pr)
if err != nil { if err != nil {
return return
} }
...@@ -111,19 +114,14 @@ func (bs *bitswap) Block(k u.Key, timeout time.Duration) ( ...@@ -111,19 +114,14 @@ func (bs *bitswap) Block(k u.Key, timeout time.Duration) (
} }
} }
func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) { func (bs *bitswap) getBlock(ctx context.Context, k u.Key, p *peer.Peer) (*blocks.Block, error) {
ctx, _ := context.WithTimeout(context.Background(), timeout)
blockChannel := bs.notifications.Subscribe(ctx, k) blockChannel := bs.notifications.Subscribe(ctx, k)
message := bsmsg.New() message := bsmsg.New()
message.AppendWanted(k) message.AppendWanted(k)
// FIXME(brian): register the accountant on the service wrapper to ensure bs.send(ctx, p, message)
// that accounting is _always_ performed when SendMessage and
// ReceiveMessage are called
bs.sender.SendMessage(ctx, p, message)
bs.strategy.MessageSent(p, message)
block, ok := <-blockChannel block, ok := <-blockChannel
if !ok { if !ok {
...@@ -132,11 +130,13 @@ func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*bloc ...@@ -132,11 +130,13 @@ func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*bloc
return &block, nil return &block, nil
} }
func (bs *bitswap) sendToPeersThatWant(block blocks.Block) { func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
for _, p := range bs.strategy.Peers() { for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
go bs.send(p, block) message := bsmsg.New()
message.AppendBlock(block)
go bs.send(ctx, p, message)
} }
} }
} }
...@@ -145,16 +145,17 @@ func (bs *bitswap) sendToPeersThatWant(block blocks.Block) { ...@@ -145,16 +145,17 @@ func (bs *bitswap) sendToPeersThatWant(block blocks.Block) {
// HasBlock announces the existance of a block to bitswap, potentially sending // HasBlock announces the existance of a block to bitswap, potentially sending
// it to peers (Partners) whose WantLists include it. // it to peers (Partners) whose WantLists include it.
func (bs *bitswap) HasBlock(blk blocks.Block) error { func (bs *bitswap) HasBlock(blk blocks.Block) error {
go bs.sendToPeersThatWant(blk) ctx := context.TODO()
go bs.sendToPeersThatWant(ctx, blk)
return bs.routing.Provide(blk.Key()) return bs.routing.Provide(blk.Key())
} }
// TODO(brian): handle errors // TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage( func (bs *bitswap) ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) { *peer.Peer, bsmsg.BitSwapMessage, error) {
bs.strategy.MessageReceived(sender, incoming) bs.strategy.MessageReceived(p, incoming)
if incoming.Blocks() != nil { if incoming.Blocks() != nil {
for _, block := range incoming.Blocks() { for _, block := range incoming.Blocks() {
...@@ -165,26 +166,26 @@ func (bs *bitswap) ReceiveMessage( ...@@ -165,26 +166,26 @@ func (bs *bitswap) ReceiveMessage(
if incoming.Wantlist() != nil { if incoming.Wantlist() != nil {
for _, key := range incoming.Wantlist() { for _, key := range incoming.Wantlist() {
if bs.strategy.ShouldSendBlockToPeer(key, sender) { if bs.strategy.ShouldSendBlockToPeer(key, p) {
block, errBlockNotFound := bs.blockstore.Get(key) block, errBlockNotFound := bs.blockstore.Get(key)
if errBlockNotFound != nil { if errBlockNotFound != nil {
// TODO(brian): log/return the error // TODO(brian): log/return the error
continue continue
} }
go bs.send(sender, *block) message := bsmsg.New()
message.AppendBlock(*block)
go bs.send(ctx, p, message)
} }
} }
} }
return nil, nil, errors.New("TODO implement") return nil, nil, errors.New("TODO implement")
} }
// TODO(brian): get a return value // send strives to ensure that accounting is always performed when a message is
func (bs *bitswap) send(p *peer.Peer, b blocks.Block) { // sent
message := bsmsg.New() func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessage) {
message.AppendBlock(b) bs.sender.SendMessage(context.Background(), p, m)
// FIXME(brian): pass ctx bs.strategy.MessageSent(p, m)
bs.sender.SendMessage(context.Background(), p, message)
bs.strategy.MessageSent(p, message)
} }
func numBytes(b blocks.Block) int { func numBytes(b blocks.Block) int {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment