From d82a2517d1ccaa8fa5680836ccc2d7bc6ff59912 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow <brian.holderchow@gmail.com> Date: Thu, 18 Sep 2014 19:01:06 -0700 Subject: [PATCH] refac(exch:bitswap) always notify strategy when message sent --- exchange/bitswap/bitswap.go | 45 +++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 96749462..f012e804 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -79,6 +79,9 @@ type bitswap struct { // GetBlock attempts to retrieve a particular block from peers, within timeout. func (bs *bitswap) Block(k u.Key, timeout time.Duration) ( *blocks.Block, error) { + ctx, _ := context.WithTimeout(context.Background(), timeout) + + // TODO replace timeout with ctx in routing interface begin := time.Now() tleft := timeout - time.Now().Sub(begin) provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout) @@ -90,7 +93,7 @@ func (bs *bitswap) Block(k u.Key, timeout time.Duration) ( go func() { for p := range provs_ch { go func(pr *peer.Peer) { - blk, err := bs.getBlock(k, pr, tleft) + blk, err := bs.getBlock(ctx, k, pr) if err != nil { return } @@ -111,19 +114,14 @@ func (bs *bitswap) Block(k u.Key, timeout time.Duration) ( } } -func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) { +func (bs *bitswap) getBlock(ctx context.Context, k u.Key, p *peer.Peer) (*blocks.Block, error) { - ctx, _ := context.WithTimeout(context.Background(), timeout) blockChannel := bs.notifications.Subscribe(ctx, k) message := bsmsg.New() message.AppendWanted(k) - // FIXME(brian): register the accountant on the service wrapper to ensure - // that accounting is _always_ performed when SendMessage and - // ReceiveMessage are called - bs.sender.SendMessage(ctx, p, message) - bs.strategy.MessageSent(p, message) + bs.send(ctx, p, message) block, ok := <-blockChannel if !ok { @@ -132,11 +130,13 @@ func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*bloc return &block, nil } -func (bs *bitswap) sendToPeersThatWant(block blocks.Block) { +func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { for _, p := range bs.strategy.Peers() { if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { - go bs.send(p, block) + message := bsmsg.New() + message.AppendBlock(block) + go bs.send(ctx, p, message) } } } @@ -145,16 +145,17 @@ func (bs *bitswap) sendToPeersThatWant(block blocks.Block) { // HasBlock announces the existance of a block to bitswap, potentially sending // it to peers (Partners) whose WantLists include it. func (bs *bitswap) HasBlock(blk blocks.Block) error { - go bs.sendToPeersThatWant(blk) + ctx := context.TODO() + go bs.sendToPeersThatWant(ctx, blk) return bs.routing.Provide(blk.Key()) } // TODO(brian): handle errors func (bs *bitswap) ReceiveMessage( - ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( + ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) ( *peer.Peer, bsmsg.BitSwapMessage, error) { - bs.strategy.MessageReceived(sender, incoming) + bs.strategy.MessageReceived(p, incoming) if incoming.Blocks() != nil { for _, block := range incoming.Blocks() { @@ -165,26 +166,26 @@ func (bs *bitswap) ReceiveMessage( if incoming.Wantlist() != nil { for _, key := range incoming.Wantlist() { - if bs.strategy.ShouldSendBlockToPeer(key, sender) { + if bs.strategy.ShouldSendBlockToPeer(key, p) { block, errBlockNotFound := bs.blockstore.Get(key) if errBlockNotFound != nil { // TODO(brian): log/return the error continue } - go bs.send(sender, *block) + message := bsmsg.New() + message.AppendBlock(*block) + go bs.send(ctx, p, message) } } } return nil, nil, errors.New("TODO implement") } -// TODO(brian): get a return value -func (bs *bitswap) send(p *peer.Peer, b blocks.Block) { - message := bsmsg.New() - message.AppendBlock(b) - // FIXME(brian): pass ctx - bs.sender.SendMessage(context.Background(), p, message) - bs.strategy.MessageSent(p, message) +// send strives to ensure that accounting is always performed when a message is +// sent +func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessage) { + bs.sender.SendMessage(context.Background(), p, m) + bs.strategy.MessageSent(p, m) } func numBytes(b blocks.Block) int { -- GitLab