Unverified Commit dc81160a authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #51 from ipfs/fix/query-goroutine-leak

fix a goroutine leak killing the gateways
parents bbeee3c3 3c828de2
...@@ -7,7 +7,7 @@ sudo: false ...@@ -7,7 +7,7 @@ sudo: false
language: go language: go
go: go:
- 1.9.x - 1.11.x
install: install:
- make deps - make deps
......
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query" "github.com/ipfs/go-datastore/query"
"github.com/jbenet/goprocess"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
) )
...@@ -631,18 +632,25 @@ func (fs *Datastore) Query(q query.Query) (query.Results, error) { ...@@ -631,18 +632,25 @@ func (fs *Datastore) Query(q query.Query) (query.Results, error) {
return nil, errors.New("flatfs only supports listing all keys in random order") return nil, errors.New("flatfs only supports listing all keys in random order")
} }
reschan := make(chan query.Result, query.KeysOnlyBufSize) // Replicates the logic in ResultsWithChan but actually respects calls
go func() { // to `Close`.
defer close(reschan) b := query.NewResultBuilder(q)
err := fs.walkTopLevel(fs.path, reschan) b.Process.Go(func(p goprocess.Process) {
if err != nil { err := fs.walkTopLevel(fs.path, b)
reschan <- query.Result{Error: errors.New("walk failed: " + err.Error())} if err == nil {
return
} }
}() select {
return query.ResultsWithChan(q, reschan), nil case b.Output <- query.Result{Error: errors.New("walk failed: " + err.Error())}:
case <-p.Closing():
}
})
go b.Process.CloseAfterChildren()
return b.Results(), nil
} }
func (fs *Datastore) walkTopLevel(path string, reschan chan query.Result) error { func (fs *Datastore) walkTopLevel(path string, result *query.ResultBuilder) error {
dir, err := os.Open(path) dir, err := os.Open(path)
if err != nil { if err != nil {
return err return err
...@@ -653,16 +661,21 @@ func (fs *Datastore) walkTopLevel(path string, reschan chan query.Result) error ...@@ -653,16 +661,21 @@ func (fs *Datastore) walkTopLevel(path string, reschan chan query.Result) error
return err return err
} }
for _, dir := range names { for _, dir := range names {
if len(dir) == 0 || dir[0] == '.' { if len(dir) == 0 || dir[0] == '.' {
continue continue
} }
err = fs.walk(filepath.Join(path, dir), reschan) err = fs.walk(filepath.Join(path, dir), result)
if err != nil { if err != nil {
return err return err
} }
// Are we closing?
select {
case <-result.Process.Closing():
return nil
default:
}
} }
return nil return nil
} }
...@@ -957,7 +970,7 @@ func (fs *Datastore) Accuracy() string { ...@@ -957,7 +970,7 @@ func (fs *Datastore) Accuracy() string {
return string(fs.storedValue.Accuracy) return string(fs.storedValue.Accuracy)
} }
func (fs *Datastore) walk(path string, reschan chan query.Result) error { func (fs *Datastore) walk(path string, result *query.ResultBuilder) error {
dir, err := os.Open(path) dir, err := os.Open(path)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
...@@ -993,10 +1006,14 @@ func (fs *Datastore) walk(path string, reschan chan query.Result) error { ...@@ -993,10 +1006,14 @@ func (fs *Datastore) walk(path string, reschan chan query.Result) error {
continue continue
} }
reschan <- query.Result{ select {
case result.Output <- query.Result{
Entry: query.Entry{ Entry: query.Entry{
Key: key.String(), Key: key.String(),
}, },
}:
case <-result.Process.Closing():
return nil
} }
} }
return nil return nil
......
...@@ -988,3 +988,34 @@ func BenchmarkBatchedPut(b *testing.B) { ...@@ -988,3 +988,34 @@ func BenchmarkBatchedPut(b *testing.B) {
} }
b.StopTimer() // avoid counting cleanup b.StopTimer() // avoid counting cleanup
} }
func TestQueryLeak(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
defer fs.Close()
for i := 0; i < 1000; i++ {
err = fs.Put(datastore.NewKey(fmt.Sprint(i)), []byte("foobar"))
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
}
before := runtime.NumGoroutine()
for i := 0; i < 200; i++ {
res, err := fs.Query(query.Query{KeysOnly: true})
if err != nil {
t.Errorf("Query fail: %v\n", err)
}
res.Close()
}
after := runtime.NumGoroutine()
if after-before > 100 {
t.Errorf("leaked %d goroutines", after-before)
}
}
...@@ -17,6 +17,12 @@ ...@@ -17,6 +17,12 @@
"hash": "QmaRb5yNXKonhbkpNxNawoydk4N6es6b4fPj19sjEKsh5D", "hash": "QmaRb5yNXKonhbkpNxNawoydk4N6es6b4fPj19sjEKsh5D",
"name": "go-datastore", "name": "go-datastore",
"version": "3.4.0" "version": "3.4.0"
},
{
"author": "whyrusleeping",
"hash": "QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP",
"name": "goprocess",
"version": "1.0.0"
} }
], ],
"gxVersion": "0.8.0", "gxVersion": "0.8.0",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment