From d0a5339547f876ed99bbe259439534dd83df980d Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow <brian.holderchow@gmail.com> Date: Fri, 19 Sep 2014 16:12:46 -0700 Subject: [PATCH] feat(bitswap) ACTIVATE FULL CONCURRENCY cap'n fix(bitswap) Put synchronously. Then notify async --- exchange/bitswap/bitswap.go | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 84cb52eb..0eaab521 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -62,6 +62,7 @@ type bitswap struct { func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) { ctx, cancelFunc := context.WithCancel(parent) + // TODO add to wantlist promise := bs.notifications.Subscribe(ctx, k) go func() { @@ -69,8 +70,8 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) peersToQuery := bs.routing.FindProvidersAsync(ctx, k, maxProviders) message := bsmsg.New() message.AppendWanted(k) - for i := range peersToQuery { - func(p *peer.Peer) { + for iiiii := range peersToQuery { + go func(p *peer.Peer) { response, err := bs.sender.SendRequest(ctx, p, message) if err != nil { return @@ -84,13 +85,14 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) return } bs.ReceiveMessage(ctx, p, response) - }(i) + }(iiiii) } }() select { case block := <-promise: cancelFunc() + // TODO remove from wantlist return &block, nil case <-parent.Done(): return nil, parent.Err() @@ -115,18 +117,17 @@ func (bs *bitswap) ReceiveMessage( return nil, nil, errors.New("Received nil Message") } - bs.strategy.MessageReceived(p, incoming) + bs.strategy.MessageReceived(p, incoming) // FIRST for _, block := range incoming.Blocks() { - err := bs.blockstore.Put(block) // FIXME(brian): err ignored - if err != nil { - return nil, nil, err - } - bs.notifications.Publish(block) - err = bs.HasBlock(ctx, block) // FIXME err ignored - if err != nil { - return nil, nil, err + // TODO verify blocks? + if err := bs.blockstore.Put(block); err != nil { + continue // FIXME(brian): err ignored } + go bs.notifications.Publish(block) + go func() { + _ = bs.HasBlock(ctx, block) // FIXME err ignored + }() } for _, key := range incoming.Wantlist() { @@ -148,7 +149,7 @@ func (bs *bitswap) ReceiveMessage( // sent func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessage) { bs.sender.SendMessage(ctx, p, m) - bs.strategy.MessageSent(p, m) + go bs.strategy.MessageSent(p, m) } func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { @@ -157,7 +158,7 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { message := bsmsg.New() message.AppendBlock(block) - bs.send(ctx, p, message) + go bs.send(ctx, p, message) } } } -- GitLab