Commit 63c28bcc authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #54 from ipfs/kevina/faster-query

add non-channel query iterator method
parents e241bd06 258aec48
......@@ -4,9 +4,9 @@ import (
"path"
"strings"
"github.com/satori/go.uuid"
dsq "github.com/ipfs/go-datastore/query"
"github.com/satori/go.uuid"
)
/*
......
......@@ -60,20 +60,21 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
return nil, err
}
ch := make(chan dsq.Result, cap(qr.Next()))
go func() {
defer close(ch)
defer qr.Close()
for r := range qr.Next() {
return dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
r, ok := qr.NextSync()
if !ok {
return r, false
}
if r.Error == nil {
r.Entry.Key = d.InvertKey(ds.RawKey(r.Entry.Key)).String()
}
ch <- r
}
}()
return dsq.DerivedResults(qr, ch), nil
return r, true
},
Close: func() error {
return qr.Close()
},
}), nil
}
func (d *ktds) Close() error {
......
......@@ -61,29 +61,29 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return nil, err
}
ch := make(chan dsq.Result, cap(qr.Next()))
go func() {
defer close(ch)
defer qr.Close()
for r := range qr.Next() {
if r.Error != nil {
ch <- r
continue
}
k := ds.RawKey(r.Entry.Key)
if !d.prefix.IsAncestorOf(k) {
continue
return dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
for {
r, ok := qr.NextSync()
if !ok {
return r, false
}
if r.Error != nil {
return r, true
}
k := ds.RawKey(r.Entry.Key)
if !d.prefix.IsAncestorOf(k) {
continue
}
r.Entry.Key = d.Datastore.InvertKey(k).String()
return r, true
}
r.Entry.Key = d.Datastore.InvertKey(k).String()
ch <- r
}
}()
return dsq.DerivedResults(qr, ch), nil
},
Close: func() error {
return qr.Close()
},
}), nil
}
func (d *datastore) Batch() (ds.Batch, error) {
......
......@@ -106,10 +106,11 @@ type Result struct {
// }
//
type Results interface {
Query() Query // the query these Results correspond to
Next() <-chan Result // returns a channel to wait for the next result
Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once.
Close() error // client may call Close to signal early exit
Query() Query // the query these Results correspond to
Next() <-chan Result // returns a channel to wait for the next result
NextSync() (Result, bool) // blocks and waits to return the next result, second paramter returns false when results are exhausted
Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once.
Close() error // client may call Close to signal early exit
// Process returns a goprocess.Process associated with these results.
// most users will not need this function (Close is all they want),
......@@ -129,6 +130,11 @@ func (r *results) Next() <-chan Result {
return r.res
}
func (r *results) NextSync() (Result, bool) {
val, ok := <-r.res
return val, ok
}
func (r *results) Rest() ([]Entry, error) {
var es []Entry
for e := range r.res {
......@@ -179,10 +185,11 @@ func (rb *ResultBuilder) Results() Results {
}
}
const NormalBufSize = 1
const KeysOnlyBufSize = 128
func NewResultBuilder(q Query) *ResultBuilder {
bufSize := 1
bufSize := NormalBufSize
if q.KeysOnly {
bufSize = KeysOnlyBufSize
}
......@@ -248,9 +255,128 @@ func ResultsWithEntries(q Query, res []Entry) Results {
}
func ResultsReplaceQuery(r Results, q Query) Results {
return &results{
switch r := r.(type) {
case *results:
// note: not using field names to make sure all fields are copied
return &results{q, r.proc, r.res}
case *resultsIter:
// note: not using field names to make sure all fields are copied
lr := r.legacyResults
if lr != nil {
lr = &results{q, lr.proc, lr.res}
}
return &resultsIter{q, r.next, r.close, lr}
default:
panic("unknown results type")
}
}
//
// ResultFromIterator provides an alternative way to to construct
// results without the use of channels.
//
func ResultsFromIterator(q Query, iter Iterator) Results {
if iter.Close == nil {
iter.Close = noopClose
}
return &resultsIter{
query: q,
proc: r.Process(),
res: r.Next(),
next: iter.Next,
close: iter.Close,
}
}
func noopClose() error {
return nil
}
type Iterator struct {
Next func() (Result, bool)
Close func() error // note: might be called more than once
}
type resultsIter struct {
query Query
next func() (Result, bool)
close func() error
legacyResults *results
}
func (r *resultsIter) Next() <-chan Result {
r.useLegacyResults()
return r.legacyResults.Next()
}
func (r *resultsIter) NextSync() (Result, bool) {
if r.legacyResults != nil {
return r.legacyResults.NextSync()
} else {
res, ok := r.next()
if !ok {
r.close()
}
return res, ok
}
}
func (r *resultsIter) Rest() ([]Entry, error) {
var es []Entry
for {
e, ok := r.NextSync()
if !ok {
break
}
if e.Error != nil {
return es, e.Error
}
es = append(es, e.Entry)
}
return es, nil
}
func (r *resultsIter) Process() goprocess.Process {
r.useLegacyResults()
return r.legacyResults.Process()
}
func (r *resultsIter) Close() error {
if r.legacyResults != nil {
return r.legacyResults.Close()
} else {
return r.close()
}
}
func (r *resultsIter) Query() Query {
return r.query
}
func (r *resultsIter) useLegacyResults() {
if r.legacyResults != nil {
return
}
b := NewResultBuilder(r.query)
// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
defer r.close()
for {
e, ok := r.next()
if !ok {
break
}
select {
case b.Output <- e:
case <-worker.Closing(): // client told us to close early
return
}
}
return
})
go b.Process.CloseAfterChildren()
r.legacyResults = b.Results().(*results)
}
......@@ -3,6 +3,7 @@ package query
import (
"strings"
"testing"
"reflect"
)
var sampleKeys = []string{
......@@ -107,3 +108,80 @@ func TestOffset(t *testing.T) {
"/ab",
})
}
func TestResultsFromIterator(t *testing.T) {
testResultsFromIteratorWClose(t, getKeysViaNextSync)
}
func TestResultsFromIteratorUsingChan(t *testing.T) {
testResultsFromIteratorWClose(t, getKeysViaChan)
}
func TestResultsFromIteratorUsingRest(t *testing.T) {
testResultsFromIteratorWClose(t, getKeysViaRest)
}
func TestResultsFromIteratorNoClose(t *testing.T) {
testResultsFromIterator(t, getKeysViaNextSync, nil)
testResultsFromIterator(t, getKeysViaChan, nil)
}
func testResultsFromIterator(t *testing.T, getKeys func(rs Results) []string, close func() error) {
i := 0
results := ResultsFromIterator(Query{}, Iterator{
Next: func() (Result, bool) {
if i >= len(sampleKeys) {
return Result{}, false
}
res := Result{Entry: Entry{Key: sampleKeys[i]}}
i++
return res, true
},
Close: close,
})
keys := getKeys(results)
if !reflect.DeepEqual(sampleKeys, keys) {
t.Errorf("did not get the same set of keys")
}
}
func testResultsFromIteratorWClose(t *testing.T, getKeys func(rs Results) []string) {
closeCalled := 0
testResultsFromIterator(t, getKeys, func() error {
closeCalled++
return nil
})
if closeCalled != 1 {
t.Errorf("close called %d times, expect it to be called just once", closeCalled)
}
}
func getKeysViaNextSync(rs Results) []string {
ret := make([]string, 0)
for {
r, ok := rs.NextSync()
if !ok {
break
}
ret = append(ret, r.Key)
}
return ret
}
func getKeysViaRest(rs Results) []string {
rest, _ := rs.Rest()
ret := make([]string, 0)
for _, e := range rest {
ret = append(ret, e.Key)
}
return ret
}
func getKeysViaChan(rs Results) []string {
ret := make([]string, 0)
for r := range rs.Next() {
ret = append(ret, r.Key)
}
return ret
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment