Commit d5f86f30 authored by Steven Allen's avatar Steven Allen

query: avoid ResultsWithChan

It's impossible to cancel this correctly. This patch also tries to avoid
goroutines whenever possible.
parent 80bb555d
...@@ -204,12 +204,12 @@ func NewResultBuilder(q Query) *ResultBuilder { ...@@ -204,12 +204,12 @@ func NewResultBuilder(q Query) *ResultBuilder {
} }
// ResultsWithChan returns a Results object from a channel // ResultsWithChan returns a Results object from a channel
// of Result entries. Respects its own Close() // of Result entries.
//
// DEPRECATED: This iterator is impossible to cancel correctly. Canceling it
// will leave anything trying to write to the result channel hanging.
func ResultsWithChan(q Query, res <-chan Result) Results { func ResultsWithChan(q Query, res <-chan Result) Results {
b := NewResultBuilder(q) return ResultsWithProcess(q, func(worker goprocess.Process, out chan<- Result) {
// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
for { for {
select { select {
case <-worker.Closing(): // client told us to close early case <-worker.Closing(): // client told us to close early
...@@ -220,13 +220,24 @@ func ResultsWithChan(q Query, res <-chan Result) Results { ...@@ -220,13 +220,24 @@ func ResultsWithChan(q Query, res <-chan Result) Results {
} }
select { select {
case b.Output <- e: case out <- e:
case <-worker.Closing(): // client told us to close early case <-worker.Closing(): // client told us to close early
return return
} }
} }
} }
}) })
}
// ResultsWithProcess returns a Results object with the results generated by the
// passed subprocess.
func ResultsWithProcess(q Query, proc func(goprocess.Process, chan<- Result)) Results {
b := NewResultBuilder(q)
// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
proc(worker, b.Output)
})
go b.Process.CloseAfterChildren() go b.Process.CloseAfterChildren()
return b.Results() return b.Results()
......
package query package query
import "sort" import (
"sort"
func DerivedResults(qr Results, ch <-chan Result) Results { goprocess "github.com/jbenet/goprocess"
return &results{ )
query: qr.Query(),
proc: qr.Process(),
res: ch,
}
}
// NaiveFilter applies a filter to the results. // NaiveFilter applies a filter to the results.
func NaiveFilter(qr Results, filter Filter) Results { func NaiveFilter(qr Results, filter Filter) Results {
ch := make(chan Result) return ResultsFromIterator(qr.Query(), Iterator{
go func() { Next: func() (Result, bool) {
defer close(ch) for {
defer qr.Close() e, ok := qr.NextSync()
if !ok {
for e := range qr.Next() { return Result{}, false
if e.Error != nil || filter.Filter(e.Entry) { }
ch <- e if e.Error != nil || filter.Filter(e.Entry) {
return e, true
}
} }
} },
}() Close: func() error {
return qr.Close()
return ResultsWithChan(qr.Query(), ch) },
})
} }
// NaiveLimit truncates the results to a given int limit // NaiveLimit truncates the results to a given int limit
func NaiveLimit(qr Results, limit int) Results { func NaiveLimit(qr Results, limit int) Results {
ch := make(chan Result) if limit == 0 {
go func() { // 0 means no limit
defer close(ch) return qr
defer qr.Close() }
closed := false
l := 0 return ResultsFromIterator(qr.Query(), Iterator{
for e := range qr.Next() { Next: func() (Result, bool) {
if e.Error != nil { if limit == 0 {
ch <- e if !closed {
continue closed = true
err := qr.Close()
if err != nil {
return Result{Error: err}, true
}
}
return Result{}, false
} }
ch <- e limit--
l++ return qr.NextSync()
if limit > 0 && l >= limit { },
break Close: func() error {
if closed {
return nil
} }
} closed = true
}() return qr.Close()
},
return ResultsWithChan(qr.Query(), ch) })
} }
// NaiveOffset skips a given number of results // NaiveOffset skips a given number of results
func NaiveOffset(qr Results, offset int) Results { func NaiveOffset(qr Results, offset int) Results {
ch := make(chan Result) return ResultsFromIterator(qr.Query(), Iterator{
go func() { Next: func() (Result, bool) {
defer close(ch) for ; offset > 0; offset-- {
defer qr.Close() res, ok := qr.NextSync()
if !ok || res.Error != nil {
sent := 0 return res, ok
for e := range qr.Next() { }
if e.Error != nil {
ch <- e
}
if sent < offset {
sent++
continue
} }
ch <- e return qr.NextSync()
} },
}() Close: func() error {
return qr.Close()
return ResultsWithChan(qr.Query(), ch) },
})
} }
// NaiveOrder reorders results according to given orders. // NaiveOrder reorders results according to given orders.
...@@ -83,29 +84,37 @@ func NaiveOrder(qr Results, orders ...Order) Results { ...@@ -83,29 +84,37 @@ func NaiveOrder(qr Results, orders ...Order) Results {
return qr return qr
} }
ch := make(chan Result) return ResultsWithProcess(qr.Query(), func(worker goprocess.Process, out chan<- Result) {
var entries []Entry
go func() {
defer close(ch)
defer qr.Close() defer qr.Close()
var entries []Entry
for e := range qr.Next() { collect:
if e.Error != nil { for {
ch <- e select {
case <-worker.Closing():
return
case e, ok := <-qr.Next():
if !ok {
break collect
}
if e.Error != nil {
out <- e
continue
}
entries = append(entries, e.Entry)
} }
entries = append(entries, e.Entry)
} }
sort.Slice(entries, func(i int, j int) bool { sort.Slice(entries, func(i int, j int) bool {
return Less(orders, entries[i], entries[j]) return Less(orders, entries[i], entries[j])
}) })
for _, e := range entries { for _, e := range entries {
ch <- Result{Entry: e} select {
case <-worker.Closing():
return
case out <- Result{Entry: e}:
}
} }
}() })
return DerivedResults(qr, ch)
} }
func NaiveQueryApply(q Query, qr Results) Results { func NaiveQueryApply(q Query, qr Results) Results {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment