Commit 1cccee0f authored by Steven Allen's avatar Steven Allen

query: fix a goroutine leak when the routing table is empty

When the routing table is empty, `Run` would fail but _not_ close the
process (leaking some query goroutines). This patch fixes this in multiple
places by:

1. Not starting queries with no peers.
2. Failing queries with no peers earlier.
parent 691b1e5f
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
todoctr "github.com/ipfs/go-todocounter" todoctr "github.com/ipfs/go-todocounter"
process "github.com/jbenet/goprocess" process "github.com/jbenet/goprocess"
ctxproc "github.com/jbenet/goprocess/context" ctxproc "github.com/jbenet/goprocess/context"
kb "github.com/libp2p/go-libp2p-kbucket"
inet "github.com/libp2p/go-libp2p-net" inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
pset "github.com/libp2p/go-libp2p-peer/peerset" pset "github.com/libp2p/go-libp2p-peer/peerset"
...@@ -58,6 +59,11 @@ type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error) ...@@ -58,6 +59,11 @@ type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
// Run runs the query at hand. pass in a list of peers to use first. // Run runs the query at hand. pass in a list of peers to use first.
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) { func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
if len(peers) == 0 {
logger.Warning("Running query with no peers!")
return nil, kb.ErrLookupFailure
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
...@@ -121,11 +127,6 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes ...@@ -121,11 +127,6 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
r.log = logger r.log = logger
r.runCtx = ctx r.runCtx = ctx
if len(peers) == 0 {
logger.Warning("Running query with no peers!")
return nil, nil
}
// setup concurrency rate limiting // setup concurrency rate limiting
for i := 0; i < r.query.concurrency; i++ { for i := 0; i < r.query.concurrency; i++ {
r.rateLimit <- struct{}{} r.rateLimit <- struct{}{}
......
...@@ -495,6 +495,15 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, ...@@ -495,6 +495,15 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid,
} }
} }
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
if len(peers) == 0 {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: kb.ErrLookupFailure.Error(),
})
return
}
// setup the Query // setup the Query
parent := ctx parent := ctx
query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
...@@ -545,7 +554,6 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, ...@@ -545,7 +554,6 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid,
return &dhtQueryResult{closerPeers: clpeers}, nil return &dhtQueryResult{closerPeers: clpeers}, nil
}) })
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
_, err := query.Run(ctx, peers) _, err := query.Run(ctx, peers)
if err != nil { if err != nil {
logger.Debugf("Query error: %s", err) logger.Debugf("Query error: %s", err)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment