Commit e994c05d authored by Steven Allen's avatar Steven Allen

query: fix goroutine leak

Closing a query early could leak the query goroutine if the buffer isn't large
enough to fit the rest of the query.
parent d7ac124b
......@@ -19,6 +19,7 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/jbenet/goprocess"
logging "github.com/ipfs/go-log"
)
......@@ -631,18 +632,25 @@ func (fs *Datastore) Query(q query.Query) (query.Results, error) {
return nil, errors.New("flatfs only supports listing all keys in random order")
}
reschan := make(chan query.Result, query.KeysOnlyBufSize)
go func() {
defer close(reschan)
err := fs.walkTopLevel(fs.path, reschan)
if err != nil {
reschan <- query.Result{Error: errors.New("walk failed: " + err.Error())}
// Replicates the logic in ResultsWithChan but actually respects calls
// to `Close`.
b := query.NewResultBuilder(q)
b.Process.Go(func(p goprocess.Process) {
err := fs.walkTopLevel(fs.path, b)
if err == nil {
return
}
}()
return query.ResultsWithChan(q, reschan), nil
select {
case b.Output <- query.Result{Error: errors.New("walk failed: " + err.Error())}:
case <-p.Closing():
}
})
go b.Process.CloseAfterChildren()
return b.Results(), nil
}
func (fs *Datastore) walkTopLevel(path string, reschan chan query.Result) error {
func (fs *Datastore) walkTopLevel(path string, result *query.ResultBuilder) error {
dir, err := os.Open(path)
if err != nil {
return err
......@@ -653,16 +661,21 @@ func (fs *Datastore) walkTopLevel(path string, reschan chan query.Result) error
return err
}
for _, dir := range names {
if len(dir) == 0 || dir[0] == '.' {
continue
}
err = fs.walk(filepath.Join(path, dir), reschan)
err = fs.walk(filepath.Join(path, dir), result)
if err != nil {
return err
}
// Are we closing?
select {
case <-result.Process.Closing():
return nil
default:
}
}
return nil
}
......@@ -957,7 +970,7 @@ func (fs *Datastore) Accuracy() string {
return string(fs.storedValue.Accuracy)
}
func (fs *Datastore) walk(path string, reschan chan query.Result) error {
func (fs *Datastore) walk(path string, result *query.ResultBuilder) error {
dir, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
......@@ -993,10 +1006,14 @@ func (fs *Datastore) walk(path string, reschan chan query.Result) error {
continue
}
reschan <- query.Result{
select {
case result.Output <- query.Result{
Entry: query.Entry{
Key: key.String(),
},
}:
case <-result.Process.Closing():
return nil
}
}
return nil
......
......@@ -17,6 +17,12 @@
"hash": "QmaRb5yNXKonhbkpNxNawoydk4N6es6b4fPj19sjEKsh5D",
"name": "go-datastore",
"version": "3.4.0"
},
{
"author": "whyrusleeping",
"hash": "QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP",
"name": "goprocess",
"version": "1.0.0"
}
],
"gxVersion": "0.8.0",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment