Unverified Commit 0bb48f3b authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #71 from ipfs/fix/query

Fix combining query filters, offsets, and limits
parents a55b83ee fc0ed1cb
......@@ -450,36 +450,50 @@ func (t *txn) Query(q dsq.Query) (dsq.Results, error) {
}
func (t *txn) query(q dsq.Query) (dsq.Results, error) {
prefix := []byte(q.Prefix)
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = !q.KeysOnly
opt.Prefix = []byte(q.Prefix)
// Special case order by key.
orders := q.Orders
if len(orders) > 0 {
// Handle ordering
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case dsq.OrderByKey, *dsq.OrderByKey:
// Already ordered by key.
orders = nil
// We order by key by default.
case dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
orders = nil
// Reverse order by key
opt.Reverse = true
}
}
default:
// Ok, we have a weird order we can't handle. Let's
// perform the _base_ query (prefix, filter, etc.), then
// handle sort/offset/limit later.
// Skip the stuff we can't apply.
baseQuery := q
baseQuery.Limit = 0
baseQuery.Offset = 0
baseQuery.Orders = nil
// perform the base query.
res, err := t.query(baseQuery)
if err != nil {
return nil, err
}
txn := t.txn
// fix the query
res = dsq.ResultsReplaceQuery(res, q)
it := txn.NewIterator(opt)
it.Seek(prefix)
// Remove the parts we've already applied.
naiveQuery := q
naiveQuery.Prefix = ""
naiveQuery.Filters = nil
if q.Offset > 0 {
for j := 0; j < q.Offset; j++ {
it.Next()
// Apply the rest of the query
return dsq.NaiveQueryApply(naiveQuery, res), nil
}
}
it := t.txn.NewIterator(opt)
qrb := dsq.NewResultBuilder(q)
qrb.Process.Go(func(worker goprocess.Process) {
t.ds.closeLk.RLock()
closedEarly := false
......@@ -509,16 +523,63 @@ func (t *txn) query(q dsq.Query) (dsq.Results, error) {
defer it.Close()
for sent := 0; it.ValidForPrefix(prefix); sent++ {
if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit {
break
// All iterators must be started by rewinding.
it.Rewind()
// skip to the offset
for skipped := 0; skipped < q.Offset && it.Valid(); it.Next() {
// On the happy path, we have no filters and we can go
// on our way.
if len(q.Filters) == 0 {
skipped++
continue
}
// On the sad path, we need to apply filters before
// counting the item as "skipped" as the offset comes
// _after_ the filter.
item := it.Item()
k := string(item.Key())
e := dsq.Entry{Key: k}
matches := true
check := func(value []byte) error {
e := dsq.Entry{Key: string(item.Key()), Value: value}
// Only calculate expirations if we need them.
if q.ReturnExpirations {
e.Expiration = expires(item)
}
matches = filter(q.Filters, e)
return nil
}
// Maybe check with the value, only if we need it.
var err error
if q.KeysOnly {
err = check(nil)
} else {
err = item.Value(check)
}
if err != nil {
select {
case qrb.Output <- dsq.Result{Error: err}:
case <-t.ds.closing: // datastore closing.
closedEarly = true
return
case <-worker.Closing(): // client told us to close early
return
}
}
if !matches {
skipped++
}
}
for sent := 0; (q.Limit <= 0 || sent < q.Limit) && it.Valid(); it.Next() {
item := it.Item()
e := dsq.Entry{Key: string(item.Key())}
// Maybe get the value
var result dsq.Result
if !q.KeysOnly {
b, err := item.ValueCopy(nil)
......@@ -533,34 +594,29 @@ func (t *txn) query(q dsq.Query) (dsq.Results, error) {
}
if q.ReturnExpirations {
result.Expiration = time.Unix(int64(item.ExpiresAt()), 0)
result.Expiration = expires(item)
}
// Finally, filter it (unless we're dealing with an error).
if result.Error == nil && filter(q.Filters, e) {
continue
}
select {
case qrb.Output <- result:
sent++
case <-t.ds.closing: // datastore closing.
closedEarly = true
return
case <-worker.Closing(): // client told us to close early
return
}
it.Next()
}
})
go qrb.Process.CloseAfterChildren() //nolint
// Now, apply remaining things (filters, order)
qr := qrb.Results()
for _, f := range q.Filters {
qr = dsq.NaiveFilter(qr, f)
}
if len(orders) > 0 {
qr = dsq.NaiveOrder(qr, orders...)
}
return qr, nil
return qrb.Results(), nil
}
func (t *txn) Commit() error {
......@@ -604,3 +660,17 @@ func (t *txn) Discard() {
func (t *txn) discard() {
t.txn.Discard()
}
// filter returns _true_ if we should filter (skip) the entry
func filter(filters []dsq.Filter, entry dsq.Entry) bool {
for _, f := range filters {
if !f.Filter(entry) {
return true
}
}
return false
}
func expires(item *badger.Item) time.Time {
return time.Unix(int64(item.ExpiresAt()), 0)
}
......@@ -12,6 +12,7 @@ import (
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
dstest "github.com/ipfs/go-datastore/test"
)
var testcases = map[string]string{
......@@ -821,3 +822,10 @@ func TestExpirations(t *testing.T) {
t.Fatalf("wrong error type: %v", err)
}
}
func TestSuite(t *testing.T) {
d, done := newDS(t)
defer done()
dstest.SubtestAll(t, d)
}
......@@ -2,7 +2,9 @@ module github.com/ipfs/go-ds-badger
require (
github.com/dgraph-io/badger v1.6.0
github.com/ipfs/go-datastore v0.1.0
github.com/ipfs/go-datastore v0.1.1
github.com/ipfs/go-log v0.0.1
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8
)
go 1.12
......@@ -10,8 +10,6 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0-rc1 h1:JphPpoBZJ3WHha133BGYlQqltSGIhV+VsEID0++nN9A=
github.com/dgraph-io/badger v1.6.0-rc1/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
......@@ -29,10 +27,8 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ipfs/go-datastore v0.0.1 h1:AW/KZCScnBWlSb5JbnEnLKFWXL224LBEh/9KXXOrUms=
github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-datastore v0.1.0 h1:TOxI04l8CmO4zGtesENhzm4PwkFwJXY3rKiYaaMf9fI=
github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-datastore v0.1.1 h1:F4k0TkTAZGLFzBOrVKDAvch6JZtuN4NHkfdcEZL50aI=
github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
......@@ -88,6 +84,7 @@ golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSF
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment