Commit 90f5ec0c authored by Jeromy's avatar Jeromy

make bitswap sub-RPC's timeout (slowly for now)

parent 96e4204f
...@@ -26,6 +26,9 @@ var log = eventlog.Logger("bitswap") ...@@ -26,6 +26,9 @@ var log = eventlog.Logger("bitswap")
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value // TODO: if a 'non-nice' strategy is implemented, consider increasing this value
const maxProvidersPerRequest = 3 const maxProvidersPerRequest = 3
const providerRequestTimeout = time.Second * 10
const hasBlockTimeout = time.Second * 15
// New initializes a BitSwap instance that communicates over the // New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as // provided BitSwapNetwork. This function registers the returned instance as
// the network delegate. // the network delegate.
...@@ -181,7 +184,8 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) { ...@@ -181,7 +184,8 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) {
for _, k := range ks { for _, k := range ks {
wg.Add(1) wg.Add(1)
go func(k u.Key) { go func(k u.Key) {
providers := bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest) child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers) err := bs.sendWantListTo(ctx, providers)
if err != nil { if err != nil {
...@@ -228,7 +232,8 @@ func (bs *bitswap) loop(parent context.Context) { ...@@ -228,7 +232,8 @@ func (bs *bitswap) loop(parent context.Context) {
// pinning a file, you store and provide all blocks associated with // pinning a file, you store and provide all blocks associated with
// it. Later, this assumption may not hold as true if we implement // it. Later, this assumption may not hold as true if we implement
// newer bitswap strategies. // newer bitswap strategies.
providers := bs.routing.FindProvidersAsync(ctx, ks[0], maxProvidersPerRequest) child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.routing.FindProvidersAsync(child, ks[0], maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers) err := bs.sendWantListTo(ctx, providers)
if err != nil { if err != nil {
...@@ -247,8 +252,21 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { ...@@ -247,8 +252,21 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
log.Debugf("Has Block %s", blk.Key()) log.Debugf("Has Block %s", blk.Key())
bs.wantlist.Remove(blk.Key()) bs.wantlist.Remove(blk.Key())
bs.notifications.Publish(blk) bs.notifications.Publish(blk)
bs.sendToPeersThatWant(ctx, blk)
return bs.routing.Provide(ctx, blk.Key()) var err error
wg := &sync.WaitGroup{}
wg.Add(2)
child, _ := context.WithTimeout(ctx, hasBlockTimeout)
go func() {
bs.sendToPeersThatWant(child, blk)
wg.Done()
}()
go func() {
err = bs.routing.Provide(child, blk.Key())
wg.Done()
}()
wg.Wait()
return err
} }
// receiveBlock handles storing the block in the blockstore and calling HasBlock // receiveBlock handles storing the block in the blockstore and calling HasBlock
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment