Commit 42a00bc0 authored by Steven Allen's avatar Steven Allen

mount: add sorting support

parent 8e1699b5
......@@ -3,6 +3,7 @@
package mount
import (
"container/heap"
"errors"
"fmt"
"sort"
......@@ -49,6 +50,102 @@ func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
return nil, ds.NewKey("/"), key
}
type queryResults struct {
mount ds.Key
results query.Results
next query.Result
}
func (qr *queryResults) advance() bool {
if qr.results == nil {
return false
}
qr.next = query.Result{}
r, more := qr.results.NextSync()
if !more {
err := qr.results.Close()
qr.results = nil
if err != nil {
// One more result, the error.
qr.next = query.Result{Error: err}
return true
}
return false
}
r.Key = qr.mount.Child(ds.RawKey(r.Key)).String()
qr.next = r
return true
}
type querySet struct {
order []query.Order
heads []*queryResults
}
func (h *querySet) Len() int {
return len(h.heads)
}
func (h *querySet) Less(i, j int) bool {
return query.Less(h.order, h.heads[i].next.Entry, h.heads[j].next.Entry)
}
func (h *querySet) Swap(i, j int) {
h.heads[i], h.heads[j] = h.heads[j], h.heads[i]
}
func (h *querySet) Push(x interface{}) {
h.heads = append(h.heads, x.(*queryResults))
}
func (h *querySet) Pop() interface{} {
i := len(h.heads) - 1
last := h.heads[i]
h.heads[i] = nil
h.heads = h.heads[:i]
return last
}
func (h *querySet) close() error {
var errs []error
for _, qr := range h.heads {
err := qr.results.Close()
if err != nil {
errs = append(errs, err)
}
}
h.heads = nil
if len(errs) > 0 {
return errs[0]
}
return nil
}
func (h *querySet) addResults(mount ds.Key, results query.Results) {
r := &queryResults{
results: results,
mount: mount,
}
if r.advance() {
heap.Push(h, r)
}
}
func (h *querySet) next() (query.Result, bool) {
if len(h.heads) == 0 {
return query.Result{}, false
}
next := h.heads[0].next
if h.heads[0].advance() {
heap.Fix(h, 0)
} else {
heap.Remove(h, 0)
}
return next, true
}
// lookupAll returns all mounts that might contain keys that are descendant of <key>
//
// Matching: /ao/e
......@@ -123,7 +220,6 @@ func (d *Datastore) Delete(key ds.Key) error {
func (d *Datastore) Query(q query.Query) (query.Results, error) {
if len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
// TODO this is still overly simplistic, but the only callers are
......@@ -133,60 +229,29 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
prefix := ds.NewKey(q.Prefix)
dses, mounts, rests := d.lookupAll(prefix)
// current itorator state
var res query.Results
var mount ds.Key
i := 0
return query.ResultsFromIterator(q, query.Iterator{
Next: func() (query.Result, bool) {
var r query.Result
var more bool
for try := true; try; try = len(dses) > i {
if res == nil {
if len(dses) <= i {
//This should not happen normally
return query.Result{}, false
queries := &querySet{
order: q.Orders,
heads: make([]*queryResults, 0, len(dses)),
}
dst := dses[i]
mount = mounts[i]
for i := range dses {
mount := mounts[i]
dstore := dses[i]
rest := rests[i]
q2 := q
q2.Prefix = rest.String()
r, err := dst.Query(q2)
qi := q
qi.Prefix = rest.String()
results, err := dstore.Query(qi)
if err != nil {
return query.Result{Error: err}, false
_ = queries.close()
return nil, err
}
res = r
queries.addResults(mount, results)
}
r, more = res.NextSync()
if !more {
err := res.Close()
if err != nil {
return query.Result{Error: err}, false
}
res = nil
i++
more = len(dses) > i
} else {
break
}
}
r.Key = mount.Child(ds.RawKey(r.Key)).String()
return r, more
},
Close: func() error {
if len(mounts) > i && res != nil {
return res.Close()
}
return nil
},
return query.ResultsFromIterator(q, query.Iterator{
Next: queries.next,
Close: queries.close,
}), nil
}
......
......@@ -353,14 +353,10 @@ func TestErrQueryClose(t *testing.T) {
m.Put(datastore.NewKey("/baz"), []byte("123"))
qr, err := m.Query(query.Query{})
if err != nil {
t.Fatalf("Query error: %v", err)
}
e, ok := qr.NextSync()
if ok != false || e.Error == nil {
t.Errorf("Query was ok or q.Error was nil")
_, err := m.Query(query.Query{})
if err == nil {
t.Fatal("expected query to fail")
return
}
}
......
......@@ -46,3 +46,21 @@ type OrderByKeyDescending struct{}
func (o OrderByKeyDescending) Compare(a, b Entry) int {
return -strings.Compare(a.Key, b.Key)
}
// Less returns true if a comes before b with the requested orderings.
func Less(orders []Order, a, b Entry) bool {
for _, cmp := range orders {
switch cmp.Compare(a, b) {
case 0:
case -1:
return true
case 1:
return false
}
}
// This gives us a *stable* sort for free. We don't care
// preserving the order from the underlying datastore
// because it's undefined.
return a.Key < b.Key
}
......@@ -97,22 +97,7 @@ func NaiveOrder(qr Results, orders ...Order) Results {
entries = append(entries, e.Entry)
}
sort.Slice(entries, func(i int, j int) bool {
a, b := entries[i], entries[j]
for _, cmp := range orders {
switch cmp.Compare(a, b) {
case 0:
case -1:
return true
case 1:
return false
}
}
// This gives us a *stable* sort for free. We don't care
// preserving the order from the underlying datastore
// because it's undefined.
return a.Key < b.Key
return Less(orders, entries[i], entries[j])
})
for _, e := range entries {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment