Commit d0a53395 authored by Brian Tiger Chow's avatar Brian Tiger Chow

feat(bitswap) ACTIVATE FULL CONCURRENCY cap'n

fix(bitswap) Put synchronously. Then notify async
parent 88f5be3f
......@@ -62,6 +62,7 @@ type bitswap struct {
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
ctx, cancelFunc := context.WithCancel(parent)
// TODO add to wantlist
promise := bs.notifications.Subscribe(ctx, k)
go func() {
......@@ -69,8 +70,8 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
peersToQuery := bs.routing.FindProvidersAsync(ctx, k, maxProviders)
message := bsmsg.New()
message.AppendWanted(k)
for i := range peersToQuery {
func(p *peer.Peer) {
for iiiii := range peersToQuery {
go func(p *peer.Peer) {
response, err := bs.sender.SendRequest(ctx, p, message)
if err != nil {
return
......@@ -84,13 +85,14 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
return
}
bs.ReceiveMessage(ctx, p, response)
}(i)
}(iiiii)
}
}()
select {
case block := <-promise:
cancelFunc()
// TODO remove from wantlist
return &block, nil
case <-parent.Done():
return nil, parent.Err()
......@@ -115,18 +117,17 @@ func (bs *bitswap) ReceiveMessage(
return nil, nil, errors.New("Received nil Message")
}
bs.strategy.MessageReceived(p, incoming)
bs.strategy.MessageReceived(p, incoming) // FIRST
for _, block := range incoming.Blocks() {
err := bs.blockstore.Put(block) // FIXME(brian): err ignored
if err != nil {
return nil, nil, err
}
bs.notifications.Publish(block)
err = bs.HasBlock(ctx, block) // FIXME err ignored
if err != nil {
return nil, nil, err
// TODO verify blocks?
if err := bs.blockstore.Put(block); err != nil {
continue // FIXME(brian): err ignored
}
go bs.notifications.Publish(block)
go func() {
_ = bs.HasBlock(ctx, block) // FIXME err ignored
}()
}
for _, key := range incoming.Wantlist() {
......@@ -148,7 +149,7 @@ func (bs *bitswap) ReceiveMessage(
// sent
func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessage) {
bs.sender.SendMessage(ctx, p, m)
bs.strategy.MessageSent(p, m)
go bs.strategy.MessageSent(p, m)
}
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
......@@ -157,7 +158,7 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block)
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
message := bsmsg.New()
message.AppendBlock(block)
bs.send(ctx, p, message)
go bs.send(ctx, p, message)
}
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment