Commit b503c740 authored by Steven Allen's avatar Steven Allen

optimize reverse-sorted query

parent 253c31fb
......@@ -6,7 +6,6 @@ import (
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
"github.com/jbenet/goprocess"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
......@@ -114,25 +113,29 @@ func (a *accessor) Delete(key ds.Key) (err error) {
}
func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
return a.queryNew(q)
}
func (a *accessor) queryNew(q dsq.Query) (dsq.Results, error) {
if len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
return a.queryOrig(q)
}
var rnge *util.Range
if q.Prefix != "" {
rnge = util.BytesPrefix([]byte(q.Prefix))
q.Prefix = ""
}
i := a.ldb.NewIterator(rnge, nil)
return dsq.ResultsFromIterator(q, dsq.Iterator{
next := i.Next
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case dsq.OrderByKey, *dsq.OrderByKey:
q.Orders = nil
case dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
next = func() bool {
next = i.Prev
return i.Last()
}
q.Orders = nil
default:
}
}
r := dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
ok := i.Next()
if !ok {
if !next() {
return dsq.Result{}, false
}
k := string(i.Key())
......@@ -149,86 +152,8 @@ func (a *accessor) queryNew(q dsq.Query) (dsq.Results, error) {
i.Release()
return nil
},
}), nil
}
func (a *accessor) queryOrig(q dsq.Query) (dsq.Results, error) {
// we can use multiple iterators concurrently. see:
// https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
// advance the iterator only if the reader reads
//
// run query in own sub-process tied to Results.Process(), so that
// it waits for us to finish AND so that clients can signal to us
// that resources should be reclaimed.
qrb := dsq.NewResultBuilder(q)
qrb.Process.Go(func(worker goprocess.Process) {
a.runQuery(worker, qrb)
})
// go wait on the worker (without signaling close)
go qrb.Process.CloseAfterChildren()
// Now, apply remaining things (filters, order)
qr := qrb.Results()
for _, f := range q.Filters {
qr = dsq.NaiveFilter(qr, f)
}
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case dsq.OrderByKey, *dsq.OrderByKey:
// Default ordering
default:
qr = dsq.NaiveOrder(qr, q.Orders...)
}
}
return qr, nil
}
func (a *accessor) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
var rnge *util.Range
if qrb.Query.Prefix != "" {
rnge = util.BytesPrefix([]byte(qrb.Query.Prefix))
}
i := a.ldb.NewIterator(rnge, nil)
defer i.Release()
// advance iterator for offset
if qrb.Query.Offset > 0 {
for j := 0; j < qrb.Query.Offset; j++ {
i.Next()
}
}
// iterate, and handle limit, too
for sent := 0; i.Next(); sent++ {
// end early if we hit the limit
if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit {
break
}
k := string(i.Key())
e := dsq.Entry{Key: k}
if !qrb.Query.KeysOnly {
buf := make([]byte, len(i.Value()))
copy(buf, i.Value())
e.Value = buf
}
select {
case qrb.Output <- dsq.Result{Entry: e}: // we sent it out
case <-worker.Closing(): // client told us to end early.
break
}
}
if err := i.Error(); err != nil {
select {
case qrb.Output <- dsq.Result{Error: err}: // client read our error
case <-worker.Closing(): // client told us to end.
return
}
}
return dsq.NaiveQueryApply(q, r), nil
}
// DiskUsage returns the current disk size used by this levelDB.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment