Unverified Commit 9db638c1 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #127 from ipfs/fix/ktds

fix the keytransform datastore's query implementation
parents b19d692f ffc9f911
...@@ -11,24 +11,3 @@ type KeyTransform interface { ...@@ -11,24 +11,3 @@ type KeyTransform interface {
ConvertKey(ds.Key) ds.Key ConvertKey(ds.Key) ds.Key
InvertKey(ds.Key) ds.Key InvertKey(ds.Key) ds.Key
} }
// Datastore is a keytransform.Datastore
type Datastore interface {
ds.Shim
KeyTransform
}
// Wrap wraps a given datastore with a KeyTransform function.
// The resulting wrapped datastore will use the transform on all Datastore
// operations.
func Wrap(child ds.Datastore, t KeyTransform) *ktds {
if t == nil {
panic("t (KeyTransform) is nil")
}
if child == nil {
panic("child (ds.Datastore) is nil")
}
return &ktds{child: child, KeyTransform: t}
}
...@@ -5,68 +5,72 @@ import ( ...@@ -5,68 +5,72 @@ import (
dsq "github.com/ipfs/go-datastore/query" dsq "github.com/ipfs/go-datastore/query"
) )
type Pair struct { // Wrap wraps a given datastore with a KeyTransform function.
Convert KeyMapping // The resulting wrapped datastore will use the transform on all Datastore
Invert KeyMapping // operations.
} func Wrap(child ds.Datastore, t KeyTransform) *Datastore {
if t == nil {
panic("t (KeyTransform) is nil")
}
func (t *Pair) ConvertKey(k ds.Key) ds.Key { if child == nil {
return t.Convert(k) panic("child (ds.Datastore) is nil")
} }
func (t *Pair) InvertKey(k ds.Key) ds.Key { return &Datastore{child: child, KeyTransform: t}
return t.Invert(k)
} }
// ktds keeps a KeyTransform function // Datastore keeps a KeyTransform function
type ktds struct { type Datastore struct {
child ds.Datastore child ds.Datastore
KeyTransform KeyTransform
} }
// Children implements ds.Shim // Children implements ds.Shim
func (d *ktds) Children() []ds.Datastore { func (d *Datastore) Children() []ds.Datastore {
return []ds.Datastore{d.child} return []ds.Datastore{d.child}
} }
// Put stores the given value, transforming the key first. // Put stores the given value, transforming the key first.
func (d *ktds) Put(key ds.Key, value []byte) (err error) { func (d *Datastore) Put(key ds.Key, value []byte) (err error) {
return d.child.Put(d.ConvertKey(key), value) return d.child.Put(d.ConvertKey(key), value)
} }
// Get returns the value for given key, transforming the key first. // Get returns the value for given key, transforming the key first.
func (d *ktds) Get(key ds.Key) (value []byte, err error) { func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
return d.child.Get(d.ConvertKey(key)) return d.child.Get(d.ConvertKey(key))
} }
// Has returns whether the datastore has a value for a given key, transforming // Has returns whether the datastore has a value for a given key, transforming
// the key first. // the key first.
func (d *ktds) Has(key ds.Key) (exists bool, err error) { func (d *Datastore) Has(key ds.Key) (exists bool, err error) {
return d.child.Has(d.ConvertKey(key)) return d.child.Has(d.ConvertKey(key))
} }
// GetSize returns the size of the value named by the given key, transforming // GetSize returns the size of the value named by the given key, transforming
// the key first. // the key first.
func (d *ktds) GetSize(key ds.Key) (size int, err error) { func (d *Datastore) GetSize(key ds.Key) (size int, err error) {
return d.child.GetSize(d.ConvertKey(key)) return d.child.GetSize(d.ConvertKey(key))
} }
// Delete removes the value for given key // Delete removes the value for given key
func (d *ktds) Delete(key ds.Key) (err error) { func (d *Datastore) Delete(key ds.Key) (err error) {
return d.child.Delete(d.ConvertKey(key)) return d.child.Delete(d.ConvertKey(key))
} }
// Query implements Query, inverting keys on the way back out. // Query implements Query, inverting keys on the way back out.
func (d *ktds) Query(q dsq.Query) (dsq.Results, error) { func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
qr, err := d.child.Query(q) nq, cq := d.prepareQuery(q)
cqr, err := d.child.Query(cq)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return dsq.ResultsFromIterator(q, dsq.Iterator{ qr := dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) { Next: func() (dsq.Result, bool) {
r, ok := qr.NextSync() r, ok := cqr.NextSync()
if !ok { if !ok {
return r, false return r, false
} }
...@@ -76,21 +80,120 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) { ...@@ -76,21 +80,120 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
return r, true return r, true
}, },
Close: func() error { Close: func() error {
return qr.Close() return cqr.Close()
}, },
}), nil })
return dsq.NaiveQueryApply(nq, qr), nil
} }
func (d *ktds) Close() error { // Split the query into a child query and a naive query. That way, we can make
// the child datastore do as much work as possible.
func (d *Datastore) prepareQuery(q dsq.Query) (naive, child dsq.Query) {
// First, put everything in the child query. Then, start taking things
// out.
child = q
// Always let the child handle the key prefix.
child.Prefix = d.ConvertKey(ds.NewKey(child.Prefix)).String()
// Check if the key transform is order-preserving so we can use the
// child datastore's built-in ordering.
orderPreserving := false
switch d.KeyTransform.(type) {
case PrefixTransform, *PrefixTransform:
orderPreserving = true
}
// Try to let the child handle ordering.
orders:
for i, o := range child.Orders {
switch o.(type) {
case dsq.OrderByValue, *dsq.OrderByValue,
dsq.OrderByValueDescending, *dsq.OrderByValueDescending:
// Key doesn't matter.
continue
case dsq.OrderByKey, *dsq.OrderByKey,
dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending:
// if the key transform preserves order, we can delegate
// to the child datastore.
if orderPreserving {
// When sorting, we compare with the first
// Order, then, if equal, we compare with the
// second Order, etc. However, keys are _unique_
// so we'll never apply any additional orders
// after ordering by key.
child.Orders = child.Orders[:i+1]
break orders
}
}
// Can't handle this order under transform, punt it to a naive
// ordering.
naive.Orders = q.Orders
child.Orders = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}
// Try to let the child handle the filters.
// don't modify the original filters.
child.Filters = append([]dsq.Filter(nil), child.Filters...)
for i, f := range child.Filters {
switch f := f.(type) {
case dsq.FilterValueCompare, *dsq.FilterValueCompare:
continue
case dsq.FilterKeyCompare:
child.Filters[i] = dsq.FilterKeyCompare{
Op: f.Op,
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case *dsq.FilterKeyCompare:
child.Filters[i] = &dsq.FilterKeyCompare{
Op: f.Op,
Key: d.ConvertKey(ds.NewKey(f.Key)).String(),
}
continue
case dsq.FilterKeyPrefix:
child.Filters[i] = dsq.FilterKeyPrefix{
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
case *dsq.FilterKeyPrefix:
child.Filters[i] = &dsq.FilterKeyPrefix{
Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(),
}
continue
}
// Not a known filter, defer to the naive implementation.
naive.Filters = q.Filters
child.Filters = nil
naive.Offset = q.Offset
child.Offset = 0
naive.Limit = q.Limit
child.Limit = 0
break
}
return
}
func (d *Datastore) Close() error {
return d.child.Close() return d.child.Close()
} }
// DiskUsage implements the PersistentDatastore interface. // DiskUsage implements the PersistentDatastore interface.
func (d *ktds) DiskUsage() (uint64, error) { func (d *Datastore) DiskUsage() (uint64, error) {
return ds.DiskUsage(d.child) return ds.DiskUsage(d.child)
} }
func (d *ktds) Batch() (ds.Batch, error) { func (d *Datastore) Batch() (ds.Batch, error) {
bds, ok := d.child.(ds.Batching) bds, ok := d.child.(ds.Batching)
if !ok { if !ok {
return nil, ds.ErrBatchUnsupported return nil, ds.ErrBatchUnsupported
...@@ -124,23 +227,29 @@ func (t *transformBatch) Commit() error { ...@@ -124,23 +227,29 @@ func (t *transformBatch) Commit() error {
return t.dst.Commit() return t.dst.Commit()
} }
func (d *ktds) Check() error { func (d *Datastore) Check() error {
if c, ok := d.child.(ds.CheckedDatastore); ok { if c, ok := d.child.(ds.CheckedDatastore); ok {
return c.Check() return c.Check()
} }
return nil return nil
} }
func (d *ktds) Scrub() error { func (d *Datastore) Scrub() error {
if c, ok := d.child.(ds.ScrubbedDatastore); ok { if c, ok := d.child.(ds.ScrubbedDatastore); ok {
return c.Scrub() return c.Scrub()
} }
return nil return nil
} }
func (d *ktds) CollectGarbage() error { func (d *Datastore) CollectGarbage() error {
if c, ok := d.child.(ds.GCDatastore); ok { if c, ok := d.child.(ds.GCDatastore); ok {
return c.CollectGarbage() return c.CollectGarbage()
} }
return nil return nil
} }
var _ ds.Datastore = (*Datastore)(nil)
var _ ds.GCDatastore = (*Datastore)(nil)
var _ ds.Batching = (*Datastore)(nil)
var _ ds.PersistentDatastore = (*Datastore)(nil)
var _ ds.ScrubbedDatastore = (*Datastore)(nil)
...@@ -20,22 +20,24 @@ type DSSuite struct{} ...@@ -20,22 +20,24 @@ type DSSuite struct{}
var _ = Suite(&DSSuite{}) var _ = Suite(&DSSuite{})
func (ks *DSSuite) TestBasic(c *C) { func testDatastore() {
}
pair := &kt.Pair{ var pair = &kt.Pair{
Convert: func(k ds.Key) ds.Key { Convert: func(k ds.Key) ds.Key {
return ds.NewKey("/abc").Child(k) return ds.NewKey("/abc").Child(k)
}, },
Invert: func(k ds.Key) ds.Key { Invert: func(k ds.Key) ds.Key {
// remove abc prefix // remove abc prefix
l := k.List() l := k.List()
if l[0] != "abc" { if l[0] != "abc" {
panic("key does not have prefix. convert failed?") panic("key does not have prefix. convert failed?")
} }
return ds.KeyWithNamespaces(l[1:]) return ds.KeyWithNamespaces(l[1:])
}, },
} }
func (ks *DSSuite) TestBasic(c *C) {
mpds := dstest.NewTestDatastore(true) mpds := dstest.NewTestDatastore(true)
ktds := kt.Wrap(mpds, pair) ktds := kt.Wrap(mpds, pair)
...@@ -110,3 +112,15 @@ func strsToKeys(strs []string) []ds.Key { ...@@ -110,3 +112,15 @@ func strsToKeys(strs []string) []ds.Key {
} }
return keys return keys
} }
func TestSuiteDefaultPair(t *testing.T) {
mpds := dstest.NewTestDatastore(true)
ktds := kt.Wrap(mpds, pair)
dstest.SubtestAll(t, ktds)
}
func TestSuitePrefixTransform(t *testing.T) {
mpds := dstest.NewTestDatastore(true)
ktds := kt.Wrap(mpds, kt.PrefixTransform{Prefix: ds.NewKey("/foo")})
dstest.SubtestAll(t, ktds)
}
package keytransform
import ds "github.com/ipfs/go-datastore"
// Pair is a convince struct for constructing a key transform.
type Pair struct {
Convert KeyMapping
Invert KeyMapping
}
func (t *Pair) ConvertKey(k ds.Key) ds.Key {
return t.Convert(k)
}
func (t *Pair) InvertKey(k ds.Key) ds.Key {
return t.Invert(k)
}
var _ KeyTransform = (*Pair)(nil)
// PrefixTransform constructs a KeyTransform with a pair of functions that
// add or remove the given prefix key.
//
// Warning: will panic if prefix not found when it should be there. This is
// to avoid insidious data inconsistency errors.
type PrefixTransform struct {
Prefix ds.Key
}
// ConvertKey adds the prefix.
func (p PrefixTransform) ConvertKey(k ds.Key) ds.Key {
return p.Prefix.Child(k)
}
// InvertKey removes the prefix. panics if prefix not found.
func (p PrefixTransform) InvertKey(k ds.Key) ds.Key {
if p.Prefix.String() == "/" {
return k
}
if !p.Prefix.IsAncestorOf(k) {
panic("expected prefix not found")
}
s := k.String()[len(p.Prefix.String()):]
return ds.RawKey(s)
}
var _ KeyTransform = (*PrefixTransform)(nil)
...@@ -685,3 +685,17 @@ func TestMaintenanceFunctions(t *testing.T) { ...@@ -685,3 +685,17 @@ func TestMaintenanceFunctions(t *testing.T) {
t.Errorf("Unexpected Scrub() error: %s", err) t.Errorf("Unexpected Scrub() error: %s", err)
} }
} }
func TestSuite(t *testing.T) {
mapds0 := datastore.NewMapDatastore()
mapds1 := datastore.NewMapDatastore()
mapds2 := datastore.NewMapDatastore()
mapds3 := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/foo"), Datastore: mapds1},
{Prefix: datastore.NewKey("/bar"), Datastore: mapds2},
{Prefix: datastore.NewKey("/baz"), Datastore: mapds3},
{Prefix: datastore.NewKey("/"), Datastore: mapds0},
})
dstest.SubtestAll(t, m)
}
...@@ -3,7 +3,6 @@ package namespace ...@@ -3,7 +3,6 @@ package namespace
import ( import (
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
ktds "github.com/ipfs/go-datastore/keytransform" ktds "github.com/ipfs/go-datastore/keytransform"
dsq "github.com/ipfs/go-datastore/query"
) )
// PrefixTransform constructs a KeyTransform with a pair of functions that // PrefixTransform constructs a KeyTransform with a pair of functions that
...@@ -11,89 +10,17 @@ import ( ...@@ -11,89 +10,17 @@ import (
// //
// Warning: will panic if prefix not found when it should be there. This is // Warning: will panic if prefix not found when it should be there. This is
// to avoid insidious data inconsistency errors. // to avoid insidious data inconsistency errors.
func PrefixTransform(prefix ds.Key) ktds.KeyTransform { //
return &ktds.Pair{ // DEPRECATED: Use ktds.PrefixTransform directly.
func PrefixTransform(prefix ds.Key) ktds.PrefixTransform {
// Convert adds the prefix return ktds.PrefixTransform{Prefix: prefix}
Convert: func(k ds.Key) ds.Key {
return prefix.Child(k)
},
// Invert removes the prefix. panics if prefix not found.
Invert: func(k ds.Key) ds.Key {
if prefix.String() == "/" {
return k
}
if !prefix.IsAncestorOf(k) {
panic("expected prefix not found")
}
s := k.String()[len(prefix.String()):]
return ds.RawKey(s)
},
}
} }
// Wrap wraps a given datastore with a key-prefix. // Wrap wraps a given datastore with a key-prefix.
func Wrap(child ds.Datastore, prefix ds.Key) *datastore { func Wrap(child ds.Datastore, prefix ds.Key) *ktds.Datastore {
if child == nil { if child == nil {
panic("child (ds.Datastore) is nil") panic("child (ds.Datastore) is nil")
} }
d := ktds.Wrap(child, PrefixTransform(prefix)) return ktds.Wrap(child, PrefixTransform(prefix))
return &datastore{Datastore: d, raw: child, prefix: prefix}
}
type datastore struct {
prefix ds.Key
raw ds.Datastore
ktds.Datastore
}
// Query implements Query, inverting keys on the way back out.
// This function assumes that child datastore.Query returns ordered results
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
q.Prefix = d.prefix.Child(ds.NewKey(q.Prefix)).String()
qr, err := d.raw.Query(q)
if err != nil {
return nil, err
}
return dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
for {
r, ok := qr.NextSync()
if !ok {
return r, false
}
if r.Error != nil {
return r, true
}
k := ds.RawKey(r.Entry.Key)
if !d.prefix.IsAncestorOf(k) {
return dsq.Result{}, false
}
r.Entry.Key = d.Datastore.InvertKey(k).String()
return r, true
}
},
Close: func() error {
return qr.Close()
},
}), nil
}
// DiskUsage implements the PersistentDatastore interface.
func (d *datastore) DiskUsage() (uint64, error) {
return ds.DiskUsage(d.raw)
}
func (d *datastore) Batch() (ds.Batch, error) {
if bds, ok := d.Datastore.(ds.Batching); ok {
return bds.Batch()
}
return nil, ds.ErrBatchUnsupported
} }
...@@ -135,15 +135,15 @@ func (ks *DSSuite) TestQuery(c *C) { ...@@ -135,15 +135,15 @@ func (ks *DSSuite) TestQuery(c *C) {
c.Check(string(ent.Value), Equals, string(expect[i].Value)) c.Check(string(ent.Value), Equals, string(expect[i].Value))
} }
if err := nsds.Datastore.(ds.CheckedDatastore).Check(); err != dstest.TestError { if err := nsds.Check(); err != dstest.TestError {
c.Errorf("Unexpected Check() error: %s", err) c.Errorf("Unexpected Check() error: %s", err)
} }
if err := nsds.Datastore.(ds.GCDatastore).CollectGarbage(); err != dstest.TestError { if err := nsds.CollectGarbage(); err != dstest.TestError {
c.Errorf("Unexpected CollectGarbage() error: %s", err) c.Errorf("Unexpected CollectGarbage() error: %s", err)
} }
if err := nsds.Datastore.(ds.ScrubbedDatastore).Scrub(); err != dstest.TestError { if err := nsds.Scrub(); err != dstest.TestError {
c.Errorf("Unexpected Scrub() error: %s", err) c.Errorf("Unexpected Scrub() error: %s", err)
} }
} }
...@@ -155,3 +155,9 @@ func strsToKeys(strs []string) []ds.Key { ...@@ -155,3 +155,9 @@ func strsToKeys(strs []string) []ds.Key {
} }
return keys return keys
} }
func TestSuite(t *testing.T) {
mpds := dstest.NewTestDatastore(true)
nsds := ns.Wrap(mpds, ds.NewKey("/foo"))
dstest.SubtestAll(t, nsds)
}
...@@ -38,13 +38,22 @@ type FilterValueCompare struct { ...@@ -38,13 +38,22 @@ type FilterValueCompare struct {
} }
func (f FilterValueCompare) Filter(e Entry) bool { func (f FilterValueCompare) Filter(e Entry) bool {
cmp := bytes.Compare(e.Value, f.Value)
switch f.Op { switch f.Op {
case Equal: case Equal:
return bytes.Equal(f.Value, e.Value) return cmp == 0
case NotEqual: case NotEqual:
return !bytes.Equal(f.Value, e.Value) return cmp != 0
case LessThan:
return cmp < 0
case LessThanOrEqual:
return cmp <= 0
case GreaterThan:
return cmp > 0
case GreaterThanOrEqual:
return cmp >= 0
default: default:
panic(fmt.Errorf("cannot apply op '%s' to interface{}.", f.Op)) panic(fmt.Errorf("unknown operation: %s", f.Op))
} }
} }
......
...@@ -2,6 +2,7 @@ package query ...@@ -2,6 +2,7 @@ package query
import ( import (
"bytes" "bytes"
"sort"
"strings" "strings"
) )
...@@ -64,3 +65,10 @@ func Less(orders []Order, a, b Entry) bool { ...@@ -64,3 +65,10 @@ func Less(orders []Order, a, b Entry) bool {
// because it's undefined. // because it's undefined.
return a.Key < b.Key return a.Key < b.Key
} }
// Sort sorts the given entries using the given orders.
func Sort(orders []Order, entries []Entry) {
sort.Slice(entries, func(i int, j int) bool {
return Less(orders, entries[i], entries[j])
})
}
package query package query
import ( import (
"sort"
goprocess "github.com/jbenet/goprocess" goprocess "github.com/jbenet/goprocess"
) )
...@@ -104,9 +102,8 @@ func NaiveOrder(qr Results, orders ...Order) Results { ...@@ -104,9 +102,8 @@ func NaiveOrder(qr Results, orders ...Order) Results {
} }
} }
sort.Slice(entries, func(i int, j int) bool { Sort(orders, entries)
return Less(orders, entries[i], entries[j])
})
for _, e := range entries { for _, e := range entries {
select { select {
case <-worker.Closing(): case <-worker.Closing():
......
...@@ -4,7 +4,7 @@ import ( ...@@ -4,7 +4,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"math/rand" "math/rand"
"sort" "strings"
"testing" "testing"
dstore "github.com/ipfs/go-datastore" dstore "github.com/ipfs/go-datastore"
...@@ -123,79 +123,192 @@ func SubtestNotFounds(t *testing.T, ds dstore.Datastore) { ...@@ -123,79 +123,192 @@ func SubtestNotFounds(t *testing.T, ds dstore.Datastore) {
} }
} }
func SubtestOrder(t *testing.T, ds dstore.Datastore) {
test := func(orders ...dsq.Order) {
var types []string
for _, o := range orders {
types = append(types, fmt.Sprintf("%T", o))
}
name := strings.Join(types, ">")
t.Run(name, func(t *testing.T) {
subtestQuery(t, ds, dsq.Query{
Orders: orders,
}, func(t *testing.T, input, output []dsq.Entry) {
if len(input) != len(output) {
t.Fatal("got wrong number of keys back")
}
dsq.Sort(orders, input)
for i, e := range output {
if input[i].Key != e.Key {
t.Fatalf("in key output, got %s but expected %s", e.Key, input[i].Key)
}
}
})
})
}
test(dsq.OrderByKey{})
test(new(dsq.OrderByKey))
test(dsq.OrderByKeyDescending{})
test(new(dsq.OrderByKeyDescending))
test(dsq.OrderByValue{})
test(dsq.OrderByValue{}, dsq.OrderByKey{})
test(dsq.OrderByFunction(func(a, b dsq.Entry) int {
return bytes.Compare(a.Value, b.Value)
}))
}
func SubtestManyKeysAndQuery(t *testing.T, ds dstore.Datastore) { func SubtestManyKeysAndQuery(t *testing.T, ds dstore.Datastore) {
var keys []dstore.Key subtestQuery(t, ds, dsq.Query{KeysOnly: true}, func(t *testing.T, input, output []dsq.Entry) {
var keystrs []string if len(input) != len(output) {
var values [][]byte t.Fatal("got wrong number of keys back")
}
dsq.Sort([]dsq.Order{dsq.OrderByKey{}}, input)
dsq.Sort([]dsq.Order{dsq.OrderByKey{}}, output)
for i, e := range output {
if input[i].Key != e.Key {
t.Fatalf("in key output, got %s but expected %s", e.Key, input[i].Key)
}
}
})
}
// need a custom test filter to test the "fallback" filter case for unknown
// filters.
type testFilter struct{}
func (testFilter) Filter(e dsq.Entry) bool {
return len(e.Key)%2 == 0
}
func SubtestFilter(t *testing.T, ds dstore.Datastore) {
test := func(filters ...dsq.Filter) {
var types []string
for _, o := range filters {
types = append(types, fmt.Sprintf("%T", o))
}
name := strings.Join(types, ">")
t.Run(name, func(t *testing.T) {
subtestQuery(t, ds, dsq.Query{
Filters: filters,
}, func(t *testing.T, input, output []dsq.Entry) {
var exp []dsq.Entry
input:
for _, e := range input {
for _, f := range filters {
if !f.Filter(e) {
continue input
}
}
exp = append(exp, e)
}
if len(exp) != len(output) {
t.Fatalf("got wrong number of keys back: expected %d, got %d", len(exp), len(output))
}
dsq.Sort([]dsq.Order{dsq.OrderByKey{}}, exp)
dsq.Sort([]dsq.Order{dsq.OrderByKey{}}, output)
for i, e := range output {
if exp[i].Key != e.Key {
t.Fatalf("in key output, got %s but expected %s", e.Key, exp[i].Key)
}
}
})
})
}
test(dsq.FilterKeyCompare{
Op: dsq.Equal,
Key: "/0key0",
})
test(dsq.FilterKeyCompare{
Op: dsq.LessThan,
Key: "/2",
})
test(&dsq.FilterKeyCompare{
Op: dsq.Equal,
Key: "/0key0",
})
test(dsq.FilterKeyPrefix{
Prefix: "/0key0",
})
test(&dsq.FilterKeyPrefix{
Prefix: "/0key0",
})
test(dsq.FilterValueCompare{
Op: dsq.LessThan,
Value: randValue(),
})
test(new(testFilter))
}
func randValue() []byte {
value := make([]byte, 64)
rand.Read(value)
return value
}
func subtestQuery(t *testing.T, ds dstore.Datastore, q dsq.Query, check func(t *testing.T, input, output []dsq.Entry)) {
var input []dsq.Entry
count := 100 count := 100
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
s := fmt.Sprintf("%dkey%d", i, i) s := fmt.Sprintf("%dkey%d", i, i)
dsk := dstore.NewKey(s) key := dstore.NewKey(s).String()
keystrs = append(keystrs, dsk.String()) value := randValue()
keys = append(keys, dsk) input = append(input, dsq.Entry{
buf := make([]byte, 64) Key: key,
rand.Read(buf) Value: value,
values = append(values, buf) })
} }
t.Logf("putting %d values", count) t.Logf("putting %d values", count)
for i, k := range keys { for i, e := range input {
err := ds.Put(k, values[i]) err := ds.Put(dstore.RawKey(e.Key), e.Value)
if err != nil { if err != nil {
t.Fatalf("error on put[%d]: %s", i, err) t.Fatalf("error on put[%d]: %s", i, err)
} }
} }
t.Log("getting values back") t.Log("getting values back")
for i, k := range keys { for i, e := range input {
val, err := ds.Get(k) val, err := ds.Get(dstore.RawKey(e.Key))
if err != nil { if err != nil {
t.Fatalf("error on get[%d]: %s", i, err) t.Fatalf("error on get[%d]: %s", i, err)
} }
if !bytes.Equal(val, values[i]) { if !bytes.Equal(val, e.Value) {
t.Fatal("input value didnt match the one returned from Get") t.Fatal("input value didnt match the one returned from Get")
} }
} }
t.Log("querying values") t.Log("querying values")
q := dsq.Query{KeysOnly: true}
resp, err := ds.Query(q) resp, err := ds.Query(q)
if err != nil { if err != nil {
t.Fatal("calling query: ", err) t.Fatal("calling query: ", err)
} }
t.Log("aggregating query results") t.Log("aggregating query results")
var outkeys []string output, err := resp.Rest()
for { if err != nil {
res, ok := resp.NextSync() t.Fatal("query result error: ", err)
if res.Error != nil {
t.Fatal("query result error: ", res.Error)
}
if !ok {
break
}
outkeys = append(outkeys, res.Key)
} }
t.Log("verifying query output") t.Log("verifying query output")
sort.Strings(keystrs) check(t, input, output)
sort.Strings(outkeys)
if len(keystrs) != len(outkeys) {
t.Fatal("got wrong number of keys back")
}
for i, s := range keystrs {
if outkeys[i] != s {
t.Fatalf("in key output, got %s but expected %s", outkeys[i], s)
}
}
t.Log("deleting all keys") t.Log("deleting all keys")
for _, k := range keys { for _, e := range input {
if err := ds.Delete(k); err != nil { if err := ds.Delete(dstore.RawKey(e.Key)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
......
...@@ -13,6 +13,8 @@ import ( ...@@ -13,6 +13,8 @@ import (
var BasicSubtests = []func(t *testing.T, ds dstore.Datastore){ var BasicSubtests = []func(t *testing.T, ds dstore.Datastore){
SubtestBasicPutGet, SubtestBasicPutGet,
SubtestNotFounds, SubtestNotFounds,
SubtestOrder,
SubtestFilter,
SubtestManyKeysAndQuery, SubtestManyKeysAndQuery,
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment