diff --git a/mount/mount.go b/mount/mount.go index 9d26f466fb99918b0dbb4e48fc5a308003fc2155..326277a686def8a2da0754a9c410c3acf0e3d21f 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -3,6 +3,7 @@ package mount import ( + "container/heap" "errors" "fmt" "sort" @@ -49,6 +50,105 @@ func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) { return nil, ds.NewKey("/"), key } +type queryResults struct { + mount ds.Key + results query.Results + next query.Result +} + +func (qr *queryResults) advance() bool { + if qr.results == nil { + return false + } + + qr.next = query.Result{} + r, more := qr.results.NextSync() + if !more { + err := qr.results.Close() + qr.results = nil + if err != nil { + // One more result, the error. + qr.next = query.Result{Error: err} + return true + } + return false + } + + r.Key = qr.mount.Child(ds.RawKey(r.Key)).String() + qr.next = r + return true +} + +type querySet struct { + query query.Query + heads []*queryResults +} + +func (h *querySet) Len() int { + return len(h.heads) +} + +func (h *querySet) Less(i, j int) bool { + return query.Less(h.query.Orders, h.heads[i].next.Entry, h.heads[j].next.Entry) +} + +func (h *querySet) Swap(i, j int) { + h.heads[i], h.heads[j] = h.heads[j], h.heads[i] +} + +func (h *querySet) Push(x interface{}) { + h.heads = append(h.heads, x.(*queryResults)) +} + +func (h *querySet) Pop() interface{} { + i := len(h.heads) - 1 + last := h.heads[i] + h.heads[i] = nil + h.heads = h.heads[:i] + return last +} + +func (h *querySet) close() error { + var errs []error + for _, qr := range h.heads { + err := qr.results.Close() + if err != nil { + errs = append(errs, err) + } + } + h.heads = nil + if len(errs) > 0 { + return errs[0] + } + return nil +} + +func (h *querySet) addResults(mount ds.Key, results query.Results) { + r := &queryResults{ + results: results, + mount: mount, + } + if r.advance() { + heap.Push(h, r) + } +} + +func (h *querySet) next() (query.Result, bool) { + if len(h.heads) == 0 { + return query.Result{}, false + } + head := h.heads[0] + next := head.next + + if head.advance() { + heap.Fix(h, 0) + } else { + heap.Remove(h, 0) + } + + return next, true +} + // lookupAll returns all mounts that might contain keys that are descendant of // // Matching: /ao/e @@ -121,73 +221,59 @@ func (d *Datastore) Delete(key ds.Key) error { return cds.Delete(k) } -func (d *Datastore) Query(q query.Query) (query.Results, error) { - if len(q.Filters) > 0 || - len(q.Orders) > 0 || - q.Limit > 0 || - q.Offset > 0 { - // TODO this is still overly simplistic, but the only callers are - // `ipfs refs local` and ipfs-ds-convert. - return nil, errors.New("mount only supports listing all prefixed keys in random order") +func (d *Datastore) Query(master query.Query) (query.Results, error) { + childQuery := query.Query{ + Prefix: master.Prefix, + Limit: master.Limit, + Orders: master.Orders, + KeysOnly: master.KeysOnly, + ReturnExpirations: master.ReturnExpirations, } - prefix := ds.NewKey(q.Prefix) + + prefix := ds.NewKey(childQuery.Prefix) dses, mounts, rests := d.lookupAll(prefix) - // current itorator state - var res query.Results - var mount ds.Key - i := 0 - - return query.ResultsFromIterator(q, query.Iterator{ - Next: func() (query.Result, bool) { - var r query.Result - var more bool - - for try := true; try; try = len(dses) > i { - if res == nil { - if len(dses) <= i { - //This should not happen normally - return query.Result{}, false - } - - dst := dses[i] - mount = mounts[i] - rest := rests[i] - - q2 := q - q2.Prefix = rest.String() - r, err := dst.Query(q2) - if err != nil { - return query.Result{Error: err}, false - } - res = r - } - - r, more = res.NextSync() - if !more { - err := res.Close() - if err != nil { - return query.Result{Error: err}, false - } - res = nil - - i++ - more = len(dses) > i - } else { - break - } - } + queries := &querySet{ + query: childQuery, + heads: make([]*queryResults, 0, len(dses)), + } - r.Key = mount.Child(ds.RawKey(r.Key)).String() - return r, more - }, - Close: func() error { - if len(mounts) > i && res != nil { - return res.Close() - } - return nil - }, - }), nil + for i := range dses { + mount := mounts[i] + dstore := dses[i] + rest := rests[i] + + qi := childQuery + qi.Prefix = rest.String() + results, err := dstore.Query(qi) + + if err != nil { + _ = queries.close() + return nil, err + } + queries.addResults(mount, results) + } + + qr := query.ResultsFromIterator(childQuery, query.Iterator{ + Next: queries.next, + Close: queries.close, + }) + + if len(master.Filters) > 0 { + for _, f := range master.Filters { + qr = query.NaiveFilter(qr, f) + } + } + + if master.Offset > 0 { + qr = query.NaiveOffset(qr, master.Offset) + } + + if childQuery.Limit > 0 { + qr = query.NaiveLimit(qr, childQuery.Limit) + } + + return qr, nil } func (d *Datastore) Close() error { diff --git a/mount/mount_test.go b/mount/mount_test.go index 9f63676c0af39ca5de11aebefe3a020b4a38f0f9..03a854e24f8487205ea32c6ad8f1120117c2f08d 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -4,9 +4,10 @@ import ( "errors" "testing" - "github.com/ipfs/go-datastore" + datastore "github.com/ipfs/go-datastore" mount "github.com/ipfs/go-datastore/mount" - "github.com/ipfs/go-datastore/query" + query "github.com/ipfs/go-datastore/query" + sync "github.com/ipfs/go-datastore/sync" dstest "github.com/ipfs/go-datastore/test" ) @@ -238,7 +239,7 @@ func TestQuerySimple(t *testing.T) { } } -func TestQueryCross(t *testing.T) { +func TestQueryAcrossMounts(t *testing.T) { mapds0 := datastore.NewMapDatastore() mapds1 := datastore.NewMapDatastore() mapds2 := datastore.NewMapDatastore() @@ -262,6 +263,10 @@ func TestQueryCross(t *testing.T) { } entries, err := res.Rest() if err != nil { + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } t.Fatalf("Query Results.Rest fail: %v\n", err) } seen := 0 @@ -297,6 +302,308 @@ func TestQueryCross(t *testing.T) { } } +func TestQueryAcrossMountsWithSort(t *testing.T) { + mapds0 := datastore.NewMapDatastore() + mapds1 := datastore.NewMapDatastore() + mapds2 := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/boo/5"), Datastore: mapds2}, + {Prefix: datastore.NewKey("/boo"), Datastore: mapds0}, + }) + + m.Put(datastore.NewKey("/zoo/0"), []byte("123")) + m.Put(datastore.NewKey("/zoo/1"), []byte("234")) + m.Put(datastore.NewKey("/boo/9"), []byte("345")) + m.Put(datastore.NewKey("/boo/3"), []byte("456")) + m.Put(datastore.NewKey("/boo/5/hello"), []byte("789")) + + res, err := m.Query(query.Query{Orders: []query.Order{query.OrderByKey{}}}) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{ + "/boo/3", + "/boo/5/hello", + "/boo/9", + "/zoo/0", + "/zoo/1", + } + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + for i, e := range expect { + if e != entries[i].Key { + t.Errorf("expected key %s, but got %s", e, entries[i].Key) + } + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + +func TestQueryLimitAcrossMountsWithSort(t *testing.T) { + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds3 := sync.MutexWrap(datastore.NewMapDatastore()) + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, + {Prefix: datastore.NewKey("/noop"), Datastore: mapds3}, + }) + + m.Put(datastore.NewKey("/rok/0"), []byte("ghi")) + m.Put(datastore.NewKey("/zoo/0"), []byte("123")) + m.Put(datastore.NewKey("/rok/1"), []byte("def")) + m.Put(datastore.NewKey("/zoo/1"), []byte("167")) + m.Put(datastore.NewKey("/zoo/2"), []byte("345")) + m.Put(datastore.NewKey("/rok/3"), []byte("abc")) + m.Put(datastore.NewKey("/zoo/3"), []byte("456")) + + q := query.Query{Limit: 2, Orders: []query.Order{query.OrderByKeyDescending{}}} + res, err := m.Query(q) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{ + "/zoo/3", + "/zoo/2", + } + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + for i, e := range expect { + if e != entries[i].Key { + t.Errorf("expected key %s, but got %s", e, entries[i].Key) + } + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + +func TestQueryLimitAndOffsetAcrossMountsWithSort(t *testing.T) { + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds3 := sync.MutexWrap(datastore.NewMapDatastore()) + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, + {Prefix: datastore.NewKey("/noop"), Datastore: mapds3}, + }) + + m.Put(datastore.NewKey("/rok/0"), []byte("ghi")) + m.Put(datastore.NewKey("/zoo/0"), []byte("123")) + m.Put(datastore.NewKey("/rok/1"), []byte("def")) + m.Put(datastore.NewKey("/zoo/1"), []byte("167")) + m.Put(datastore.NewKey("/zoo/2"), []byte("345")) + m.Put(datastore.NewKey("/rok/3"), []byte("abc")) + m.Put(datastore.NewKey("/zoo/3"), []byte("456")) + + q := query.Query{Limit: 3, Offset: 2, Orders: []query.Order{query.OrderByKey{}}} + res, err := m.Query(q) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{ + "/rok/3", + "/zoo/0", + "/zoo/1", + } + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + for i, e := range expect { + if e != entries[i].Key { + t.Errorf("expected key %s, but got %s", e, entries[i].Key) + } + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + +func TestQueryFilterAcrossMountsWithSort(t *testing.T) { + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds3 := sync.MutexWrap(datastore.NewMapDatastore()) + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, + {Prefix: datastore.NewKey("/noop"), Datastore: mapds3}, + }) + + m.Put(datastore.NewKey("/rok/0"), []byte("ghi")) + m.Put(datastore.NewKey("/zoo/0"), []byte("123")) + m.Put(datastore.NewKey("/rok/1"), []byte("def")) + m.Put(datastore.NewKey("/zoo/1"), []byte("167")) + m.Put(datastore.NewKey("/zoo/2"), []byte("345")) + m.Put(datastore.NewKey("/rok/3"), []byte("abc")) + m.Put(datastore.NewKey("/zoo/3"), []byte("456")) + + f := &query.FilterKeyCompare{Op: query.Equal, Key: "/rok/3"} + q := query.Query{Filters: []query.Filter{f}} + res, err := m.Query(q) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{ + "/rok/3", + } + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + for i, e := range expect { + if e != entries[i].Key { + t.Errorf("expected key %s, but got %s", e, entries[i].Key) + } + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + +func TestQueryLimitAndOffsetWithNoData(t *testing.T) { + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, + }) + + q := query.Query{Limit: 4, Offset: 3} + res, err := m.Query(q) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{} + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + +func TestQueryLimitWithNotEnoughData(t *testing.T) { + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, + }) + + m.Put(datastore.NewKey("/zoo/0"), []byte("123")) + m.Put(datastore.NewKey("/rok/1"), []byte("167")) + + q := query.Query{Limit: 4} + res, err := m.Query(q) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{ + "/zoo/0", + "/rok/1", + } + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + +func TestQueryOffsetWithNotEnoughData(t *testing.T) { + mapds1 := sync.MutexWrap(datastore.NewMapDatastore()) + mapds2 := sync.MutexWrap(datastore.NewMapDatastore()) + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/rok"), Datastore: mapds1}, + {Prefix: datastore.NewKey("/zoo"), Datastore: mapds2}, + }) + + m.Put(datastore.NewKey("/zoo/0"), []byte("123")) + m.Put(datastore.NewKey("/rok/1"), []byte("167")) + + q := query.Query{Offset: 4} + res, err := m.Query(q) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + + expect := []string{} + + if len(entries) != len(expect) { + t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) + } + + err = res.Close() + if err != nil { + t.Errorf("result.Close failed %d", err) + } +} + func TestLookupPrio(t *testing.T) { mapds0 := datastore.NewMapDatastore() mapds1 := datastore.NewMapDatastore() @@ -353,14 +660,10 @@ func TestErrQueryClose(t *testing.T) { m.Put(datastore.NewKey("/baz"), []byte("123")) - qr, err := m.Query(query.Query{}) - if err != nil { - t.Fatalf("Query error: %v", err) - } - - e, ok := qr.NextSync() - if ok != false || e.Error == nil { - t.Errorf("Query was ok or q.Error was nil") + _, err := m.Query(query.Query{}) + if err == nil { + t.Fatal("expected query to fail") + return } } diff --git a/query/order.go b/query/order.go index 2ad1c9906d20dc01b02d87f0e343577b1619cd50..f66155461f7325e0bcf67814d0183319fbf1f50c 100644 --- a/query/order.go +++ b/query/order.go @@ -46,3 +46,21 @@ type OrderByKeyDescending struct{} func (o OrderByKeyDescending) Compare(a, b Entry) int { return -strings.Compare(a.Key, b.Key) } + +// Less returns true if a comes before b with the requested orderings. +func Less(orders []Order, a, b Entry) bool { + for _, cmp := range orders { + switch cmp.Compare(a, b) { + case 0: + case -1: + return true + case 1: + return false + } + } + + // This gives us a *stable* sort for free. We don't care + // preserving the order from the underlying datastore + // because it's undefined. + return a.Key < b.Key +} diff --git a/query/query_impl.go b/query/query_impl.go index d29b0e32f9e19fab559c0b9cca5ca86f77453258..440513ce94cbd718134f5dbc1aeeaa499b313f6a 100644 --- a/query/query_impl.go +++ b/query/query_impl.go @@ -24,7 +24,7 @@ func NaiveFilter(qr Results, filter Filter) Results { } }() - return DerivedResults(qr, ch) + return ResultsWithChan(qr.Query(), ch) } // NaiveLimit truncates the results to a given int limit @@ -48,7 +48,7 @@ func NaiveLimit(qr Results, limit int) Results { } }() - return DerivedResults(qr, ch) + return ResultsWithChan(qr.Query(), ch) } // NaiveOffset skips a given number of results @@ -72,7 +72,7 @@ func NaiveOffset(qr Results, offset int) Results { } }() - return DerivedResults(qr, ch) + return ResultsWithChan(qr.Query(), ch) } // NaiveOrder reorders results according to given orders. @@ -97,22 +97,7 @@ func NaiveOrder(qr Results, orders ...Order) Results { entries = append(entries, e.Entry) } sort.Slice(entries, func(i int, j int) bool { - a, b := entries[i], entries[j] - - for _, cmp := range orders { - switch cmp.Compare(a, b) { - case 0: - case -1: - return true - case 1: - return false - } - } - - // This gives us a *stable* sort for free. We don't care - // preserving the order from the underlying datastore - // because it's undefined. - return a.Key < b.Key + return Less(orders, entries[i], entries[j]) }) for _, e := range entries { @@ -137,7 +122,7 @@ func NaiveQueryApply(q Query, qr Results) Results { qr = NaiveOffset(qr, q.Offset) } if q.Limit != 0 { - qr = NaiveLimit(qr, q.Offset) + qr = NaiveLimit(qr, q.Limit) } return qr } diff --git a/query/query_test.go b/query/query_test.go index 28e5301b0ae6792594a597e5f4750ee52bcb22bb..228c69a4d391106dd6e3caa87e9121edda2f3928 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -40,6 +40,70 @@ func testResults(t *testing.T, res Results, expect []string) { } } +func TestNaiveQueryApply(t *testing.T) { + testNaiveQueryApply := func(t *testing.T, query Query, keys []string, expect []string) { + e := make([]Entry, len(keys)) + for i, k := range keys { + e[i] = Entry{Key: k} + } + + res := ResultsWithEntries(query, e) + res = NaiveQueryApply(query, res) + + testResults(t, res, expect) + } + + q := Query{Limit: 2} + + testNaiveQueryApply(t, q, sampleKeys, []string{ + "/ab/c", + "/ab/cd", + }) + + q = Query{Offset: 3, Limit: 2} + testNaiveQueryApply(t, q, sampleKeys, []string{ + "/abce", + "/abcf", + }) + + f := &FilterKeyCompare{Op: Equal, Key: "/ab"} + q = Query{Filters: []Filter{f}} + testNaiveQueryApply(t, q, sampleKeys, []string{ + "/ab", + }) + + q = Query{Prefix: "/ab"} + testNaiveQueryApply(t, q, sampleKeys, []string{ + "/ab/c", + "/ab/cd", + "/abce", + "/abcf", + "/ab", + }) + + q = Query{Orders: []Order{OrderByKeyDescending{}}} + testNaiveQueryApply(t, q, sampleKeys, []string{ + "/abcf", + "/abce", + "/ab/cd", + "/ab/c", + "/ab", + "/a", + }) + + q = Query{ + Limit: 3, + Offset: 2, + Prefix: "/ab", + Orders: []Order{OrderByKey{}}, + } + testNaiveQueryApply(t, q, sampleKeys, []string{ + "/ab/cd", + "/abce", + "/abcf", + }) +} + func TestLimit(t *testing.T) { testKeyLimit := func(t *testing.T, limit int, keys []string, expect []string) { e := make([]Entry, len(keys))