Commit 7194f47f authored by Steven Allen's avatar Steven Allen

fix the keytransform datastore's query implementation

The namespace Datastore shouldn't have to touch the mess with the query.

Note: this _also_ correctly applies ordering, filters, etc. to the transformed
keys (in both the namespaced and the key transform datastores).
parent d28fbf64
......@@ -11,19 +11,3 @@ type KeyTransform interface {
ConvertKey(ds.Key) ds.Key
InvertKey(ds.Key) ds.Key
}
// Pair is a convince struct for constructing a key transform.
type Pair struct {
Convert KeyMapping
Invert KeyMapping
}
func (t *Pair) ConvertKey(k ds.Key) ds.Key {
return t.Convert(k)
}
func (t *Pair) InvertKey(k ds.Key) ds.Key {
return t.Invert(k)
}
var _ KeyTransform = (*Pair)(nil)
......@@ -61,14 +61,16 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
// Query implements Query, inverting keys on the way back out.
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
qr, err := d.child.Query(q)
nq, cq := d.prepareQuery(q)
cqr, err := d.child.Query(cq)
if err != nil {
return nil, err
}
return dsq.ResultsFromIterator(q, dsq.Iterator{
qr := dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
r, ok := qr.NextSync()
r, ok := cqr.NextSync()
if !ok {
return r, false
}
......@@ -78,9 +80,101 @@ func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return r, true
},
Close: func() error {
return qr.Close()
return cqr.Close()
},
}), nil
})
return dsq.NaiveQueryApply(nq, qr), nil
}
// Split the query into a child query and a naive query. That way, we can make
// the child datastore do as much work as possible.
func (d *Datastore) prepareQuery(q dsq.Query) (naive, child dsq.Query) {
// First, put everything in the child query. Then, start taking things
// out.
child = q
// Always let the child handle the key prefix.
child.Prefix = d.ConvertKey(ds.NewKey(child.Prefix)).String()
// Check if the key transform is order-preserving so we can use the
// child datastore's built-in ordering.
orderPreserving := false
switch d.KeyTransform.(type) {
case PrefixTransform, *PrefixTransform:
orderPreserving = true
}
// Try to let the child handle ordering.
orders:
for i, o := range child.Orders {
switch o.(type) {
case dsq.OrderByValue, *dsq.OrderByValue,
dsq.OrderByValueDescending, *dsq.OrderByValueDescending:
// Key doesn't matter.
continue
case dsq.OrderByKey, *dsq.OrderByKey,
dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
if orderPreserving {
child.Orders = child.Orders[:i+1]
break orders
}
}
// Can't handle this order under transform, punt it to a naive
// ordering.
naive.Orders = q.Orders
child.Orders = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}
// Try to let the child handle the filters.
// don't modify the original filters.
child.Filters = append([]dsq.Filter(nil), child.Filters...)
for i, f := range child.Filters {
switch f := f.(type) {
case dsq.FilterValueCompare, *dsq.FilterValueCompare:
continue
case dsq.FilterKeyCompare:
child.Filters[i] = dsq.FilterKeyCompare{
Op: f.Op,
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case *dsq.FilterKeyCompare:
child.Filters[i] = &dsq.FilterKeyCompare{
Op: f.Op,
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case dsq.FilterKeyPrefix:
child.Filters[i] = dsq.FilterKeyPrefix{
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
case *dsq.FilterKeyPrefix:
child.Filters[i] = &dsq.FilterKeyPrefix{
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
}
// Not a known filter, defer to the naive implementation.
naive.Filters = q.Filters
child.Filters = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}
return
}
func (d *Datastore) Close() error {
......
package keytransform
import ds "github.com/ipfs/go-datastore"
// Pair is a convince struct for constructing a key transform.
type Pair struct {
Convert KeyMapping
Invert KeyMapping
}
func (t *Pair) ConvertKey(k ds.Key) ds.Key {
return t.Convert(k)
}
func (t *Pair) InvertKey(k ds.Key) ds.Key {
return t.Invert(k)
}
var _ KeyTransform = (*Pair)(nil)
// PrefixTransform constructs a KeyTransform with a pair of functions that
// add or remove the given prefix key.
//
// Warning: will panic if prefix not found when it should be there. This is
// to avoid insidious data inconsistency errors.
type PrefixTransform struct {
Prefix ds.Key
}
// ConvertKey adds the prefix.
func (p PrefixTransform) ConvertKey(k ds.Key) ds.Key {
return p.Prefix.Child(k)
}
// InvertKey removes the prefix. panics if prefix not found.
func (p PrefixTransform) InvertKey(k ds.Key) ds.Key {
if p.Prefix.String() == "/" {
return k
}
if !p.Prefix.IsAncestorOf(k) {
panic("expected prefix not found")
}
s := k.String()[len(p.Prefix.String()):]
return ds.RawKey(s)
}
var _ KeyTransform = (*PrefixTransform)(nil)
......@@ -3,7 +3,6 @@ package namespace
import (
ds "github.com/ipfs/go-datastore"
ktds "github.com/ipfs/go-datastore/keytransform"
dsq "github.com/ipfs/go-datastore/query"
)
// PrefixTransform constructs a KeyTransform with a pair of functions that
......@@ -11,76 +10,17 @@ import (
//
// Warning: will panic if prefix not found when it should be there. This is
// to avoid insidious data inconsistency errors.
func PrefixTransform(prefix ds.Key) ktds.KeyTransform {
return &ktds.Pair{
// Convert adds the prefix
Convert: func(k ds.Key) ds.Key {
return prefix.Child(k)
},
// Invert removes the prefix. panics if prefix not found.
Invert: func(k ds.Key) ds.Key {
if prefix.String() == "/" {
return k
}
if !prefix.IsAncestorOf(k) {
panic("expected prefix not found")
}
s := k.String()[len(prefix.String()):]
return ds.RawKey(s)
},
}
//
// DEPRECATED: Use ktds.PrefixTransform directly.
func PrefixTransform(prefix ds.Key) ktds.PrefixTransform {
return ktds.PrefixTransform{Prefix: prefix}
}
// Wrap wraps a given datastore with a key-prefix.
func Wrap(child ds.Datastore, prefix ds.Key) *Datastore {
func Wrap(child ds.Datastore, prefix ds.Key) *ktds.Datastore {
if child == nil {
panic("child (ds.Datastore) is nil")
}
d := ktds.Wrap(child, PrefixTransform(prefix))
return &Datastore{Datastore: d, raw: child, prefix: prefix}
}
type Datastore struct {
*ktds.Datastore
prefix ds.Key
raw ds.Datastore
}
// Query implements Query, inverting keys on the way back out.
// This function assumes that child datastore.Query returns ordered results
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
q.Prefix = d.prefix.Child(ds.NewKey(q.Prefix)).String()
qr, err := d.raw.Query(q)
if err != nil {
return nil, err
}
return dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
for {
r, ok := qr.NextSync()
if !ok {
return r, false
}
if r.Error != nil {
return r, true
}
k := ds.RawKey(r.Entry.Key)
if !d.prefix.IsAncestorOf(k) {
return dsq.Result{}, false
}
r.Entry.Key = d.Datastore.InvertKey(k).String()
return r, true
}
},
Close: func() error {
return qr.Close()
},
}), nil
return ktds.Wrap(child, PrefixTransform(prefix))
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment