Commit 2a4a6d3e authored by Brian Tiger Chow's avatar Brian Tiger Chow Committed by Jeromy

feat(bitswap) implement GetBlocks

@whyrusleeping @jbenet

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent 470b02d4
...@@ -79,9 +79,7 @@ type bitswap struct { ...@@ -79,9 +79,7 @@ type bitswap struct {
} }
// GetBlock attempts to retrieve a particular block from peers within the // GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context // deadline enforced by the context.
//
// TODO ensure only one active request per key
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) { func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
// make sure to derive a new |ctx| and pass it to children. It's correct to // make sure to derive a new |ctx| and pass it to children. It's correct to
...@@ -95,26 +93,36 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ...@@ -95,26 +93,36 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
log.Event(ctx, "GetBlockRequestBegin", &k) log.Event(ctx, "GetBlockRequestBegin", &k)
defer log.Event(ctx, "GetBlockRequestEnd", &k) defer log.Event(ctx, "GetBlockRequestEnd", &k)
promise := bs.notifications.Subscribe(ctx, k) promise, err := bs.GetBlocks(parent, []u.Key{k})
if err != nil {
select { return nil, err
case bs.batchRequests <- []u.Key{k}:
case <-parent.Done():
return nil, parent.Err()
} }
select { select {
case block := <-promise: case block := <-promise:
bs.wantlist.Remove(k)
return &block, nil return &block, nil
case <-parent.Done(): case <-parent.Done():
return nil, parent.Err() return nil, parent.Err()
} }
} }
func (bs *bitswap) GetBlocks(parent context.Context, ks []u.Key) (*blocks.Block, error) { // GetBlocks returns a channel where the caller may receive blocks that
// TODO: something smart // correspond to the provided |keys|. Returns an error if BitSwap is unable to
return nil, nil // begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan blocks.Block, error) {
// TODO log the request
promise := bs.notifications.Subscribe(ctx, keys...)
select {
case bs.batchRequests <- keys:
return promise, nil
case <-ctx.Done():
return nil, ctx.Err()
}
} }
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error { func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
...@@ -155,6 +163,7 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e ...@@ -155,6 +163,7 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
return nil return nil
} }
// TODO ensure only one active request per key
func (bs *bitswap) run(ctx context.Context) { func (bs *bitswap) run(ctx context.Context) {
// Every so often, we should resend out our current want list // Every so often, we should resend out our current want list
...@@ -238,6 +247,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm ...@@ -238,6 +247,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
continue // FIXME(brian): err ignored continue // FIXME(brian): err ignored
} }
bs.notifications.Publish(block) bs.notifications.Publish(block)
bs.wantlist.Remove(block.Key())
err := bs.HasBlock(ctx, block) err := bs.HasBlock(ctx, block)
if err != nil { if err != nil {
log.Warningf("HasBlock errored: %s", err) log.Warningf("HasBlock errored: %s", err)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment