Unverified Commit 47a96270 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #28 from ipfs/feat/fast-reverse-query

Fast reverse query
parents 253c31fb 272a284b
......@@ -10,7 +10,6 @@ env:
global:
- GOTFLAGS="-race -cpu=5"
matrix:
- BUILD_DEPTYPE=gx
- BUILD_DEPTYPE=gomod
......@@ -24,7 +23,6 @@ script:
cache:
directories:
- $GOPATH/src/gx
- $GOPATH/pkg/mod
- $HOME/.cache/go-build
......
......@@ -6,7 +6,6 @@ import (
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
"github.com/jbenet/goprocess"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
......@@ -114,25 +113,33 @@ func (a *accessor) Delete(key ds.Key) (err error) {
}
func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
return a.queryNew(q)
}
func (a *accessor) queryNew(q dsq.Query) (dsq.Results, error) {
if len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
return a.queryOrig(q)
}
var rnge *util.Range
// make a copy of the query for the fallback naive query implementation.
// don't modify the original so res.Query() returns the correct results.
qNaive := q
if q.Prefix != "" {
rnge = util.BytesPrefix([]byte(q.Prefix))
qNaive.Prefix = ""
}
i := a.ldb.NewIterator(rnge, nil)
return dsq.ResultsFromIterator(q, dsq.Iterator{
next := i.Next
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case dsq.OrderByKey, *dsq.OrderByKey:
qNaive.Orders = nil
case dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
next = func() bool {
next = i.Prev
return i.Last()
}
qNaive.Orders = nil
default:
}
}
r := dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
ok := i.Next()
if !ok {
if !next() {
return dsq.Result{}, false
}
k := string(i.Key())
......@@ -149,86 +156,8 @@ func (a *accessor) queryNew(q dsq.Query) (dsq.Results, error) {
i.Release()
return nil
},
}), nil
}
func (a *accessor) queryOrig(q dsq.Query) (dsq.Results, error) {
// we can use multiple iterators concurrently. see:
// https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
// advance the iterator only if the reader reads
//
// run query in own sub-process tied to Results.Process(), so that
// it waits for us to finish AND so that clients can signal to us
// that resources should be reclaimed.
qrb := dsq.NewResultBuilder(q)
qrb.Process.Go(func(worker goprocess.Process) {
a.runQuery(worker, qrb)
})
// go wait on the worker (without signaling close)
go qrb.Process.CloseAfterChildren()
// Now, apply remaining things (filters, order)
qr := qrb.Results()
for _, f := range q.Filters {
qr = dsq.NaiveFilter(qr, f)
}
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case dsq.OrderByKey, *dsq.OrderByKey:
// Default ordering
default:
qr = dsq.NaiveOrder(qr, q.Orders...)
}
}
return qr, nil
}
func (a *accessor) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
var rnge *util.Range
if qrb.Query.Prefix != "" {
rnge = util.BytesPrefix([]byte(qrb.Query.Prefix))
}
i := a.ldb.NewIterator(rnge, nil)
defer i.Release()
// advance iterator for offset
if qrb.Query.Offset > 0 {
for j := 0; j < qrb.Query.Offset; j++ {
i.Next()
}
}
// iterate, and handle limit, too
for sent := 0; i.Next(); sent++ {
// end early if we hit the limit
if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit {
break
}
k := string(i.Key())
e := dsq.Entry{Key: k}
if !qrb.Query.KeysOnly {
buf := make([]byte, len(i.Value()))
copy(buf, i.Value())
e.Value = buf
}
select {
case qrb.Output <- dsq.Result{Entry: e}: // we sent it out
case <-worker.Closing(): // client told us to end early.
break
}
}
if err := i.Error(); err != nil {
select {
case qrb.Output <- dsq.Result{Error: err}: // client read our error
case <-worker.Closing(): // client told us to end.
return
}
}
return dsq.NaiveQueryApply(qNaive, r), nil
}
// DiskUsage returns the current disk size used by this levelDB.
......
......@@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"os"
"sort"
"testing"
ds "github.com/ipfs/go-datastore"
......@@ -101,6 +102,33 @@ func testQuery(t *testing.T, d *Datastore) {
"/a/b/d",
"/a/c",
}, rs)
// test order
rs, err = d.Query(dsq.Query{Orders: []dsq.Order{dsq.OrderByKey{}}})
if err != nil {
t.Fatal(err)
}
keys := make([]string, 0, len(testcases))
for k := range testcases {
keys = append(keys, k)
}
sort.Strings(keys)
expectOrderedMatches(t, keys, rs)
rs, err = d.Query(dsq.Query{Orders: []dsq.Order{dsq.OrderByKeyDescending{}}})
if err != nil {
t.Fatal(err)
}
// reverse
for i, j := 0, len(keys)-1; i < j; i, j = i+1, j-1 {
keys[i], keys[j] = keys[j], keys[i]
}
expectOrderedMatches(t, keys, rs)
}
func TestQuery(t *testing.T) {
......@@ -125,6 +153,7 @@ func TestQueryRespectsProcessMem(t *testing.T) {
}
func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
t.Helper()
actual, err := actualR.Rest()
if err != nil {
t.Error(err)
......@@ -146,6 +175,23 @@ func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
}
}
func expectOrderedMatches(t *testing.T, expect []string, actualR dsq.Results) {
t.Helper()
actual, err := actualR.Rest()
if err != nil {
t.Error(err)
}
if len(actual) != len(expect) {
t.Error("not enough", expect, actual)
}
for i := range expect {
if expect[i] != actual[i].Key {
t.Errorf("expected %q, got %q", expect[i], actual[i].Key)
}
}
}
func testBatching(t *testing.T, d *Datastore) {
b, err := d.Batch()
if err != nil {
......
{
"author": "whyrusleeping",
"bugs": {
"url": "https://github.com/ipfs/go-ds-leveldb"
},
"gx": {
"dvcsimport": "github.com/ipfs/go-ds-leveldb"
},
"gxDependencies": [
{
"author": "syndtr",
"hash": "QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g",
"name": "goleveldb",
"version": "0.0.1"
},
{
"author": "whyrusleeping",
"hash": "QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP",
"name": "goprocess",
"version": "1.0.0"
},
{
"author": "jbenet",
"hash": "QmUadX5EcvrBmxAV9sE7wUWtWSqxns5K84qKJBixmcT1w9",
"name": "go-datastore",
"version": "3.6.1"
}
],
"gxVersion": "0.8.0",
"language": "go",
"license": "",
"name": "go-ds-leveldb",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "1.3.0"
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment