Commit ef92b55d authored by Brian Tiger Chow's avatar Brian Tiger Chow

feat(exch:bitswap) simply get method

parent 640fa135
......@@ -65,63 +65,38 @@ type bitswap struct {
// deadline enforced by the context
//
// TODO ensure only one active request per key
func (bs *bitswap) Block(ctx context.Context, k u.Key) (*blocks.Block, error) {
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
const maxProviders = 20
provs_ch := bs.routing.FindProvidersAsync(ctx, k, maxProviders)
blockChannel := make(chan blocks.Block)
ctx, cancelFunc := context.WithCancel(parent)
promise := bs.notifications.Subscribe(ctx, k)
// TODO: when the data is received, shut down this for loop ASAP
go func() {
for p := range provs_ch {
go func(pr *peer.Peer) {
blk, err := bs.getBlock(ctx, k, pr)
const maxProviders = 20
peersToQuery := bs.routing.FindProvidersAsync(ctx, k, maxProviders)
message := bsmsg.New()
message.AppendWanted(k)
for i := range peersToQuery {
go func(p *peer.Peer) {
response, err := bs.sender.SendRequest(ctx, p, message)
if err != nil {
return
}
select {
case blockChannel <- *blk:
default:
}
}(p)
// FIXME ensure accounting is handled correctly when
// communication fails. May require slightly different API to
// get better guarantees. May need shared sequence numbers.
bs.strategy.MessageSent(p, message)
bs.ReceiveMessage(ctx, p, response)
}(i)
}
}()
select {
case block := <-blockChannel:
close(blockChannel)
case block := <-promise:
cancelFunc()
return &block, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (bs *bitswap) getBlock(ctx context.Context, k u.Key, p *peer.Peer) (*blocks.Block, error) {
blockChannel := bs.notifications.Subscribe(ctx, k)
message := bsmsg.New()
message.AppendWanted(k)
bs.send(ctx, p, message)
block, ok := <-blockChannel
if !ok {
return nil, u.ErrTimeout
}
return &block, nil
}
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
message := bsmsg.New()
message.AppendBlock(block)
go bs.send(ctx, p, message)
}
}
case <-parent.Done():
return nil, parent.Err()
}
}
......@@ -173,3 +148,15 @@ func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessag
func numBytes(b blocks.Block) int {
return len(b.Data)
}
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
message := bsmsg.New()
message.AppendBlock(block)
go bs.send(ctx, p, message)
}
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment