Commit aaac529a authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

better query, respecting cancel

parent 882341c7
......@@ -3,11 +3,11 @@ package leveldb
import (
"io"
ds "github.com/jbenet/go-datastore"
"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/jbenet/goprocess"
"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb"
"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util"
ds "github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-datastore/query"
)
......@@ -72,63 +72,80 @@ func (d *datastore) Delete(key ds.Key) (err error) {
return err
}
func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) {
var rnge *util.Range
if q.Prefix != "" {
rnge = util.BytesPrefix([]byte(q.Prefix))
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
// we can use multiple iterators concurrently. see:
// https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
// advance the iterator only if the reader reads
//
// run query in own sub-process tied to Results.Process(), so that
// it waits for us to finish AND so that clients can signal to us
// that resources should be reclaimed.
qrb := dsq.NewResultBuilder(q)
qrb.Process.Go(func(worker goprocess.Process) {
d.runQuery(worker, qrb)
})
// go wait on the worker (without signaling close)
go qrb.Process.CloseAfterChildren()
// Now, apply remaining things (filters, order)
qr := qrb.Results()
for _, f := range q.Filters {
qr = dsq.NaiveFilter(qr, f)
}
i := d.DB.NewIterator(rnge, nil)
for _, o := range q.Orders {
qr = dsq.NaiveOrder(qr, o)
}
return qr, nil
}
// buffer this channel so that we dont totally block leveldb if client
// is not reading from chan.
ch := make(chan dsq.Entry, 1000)
qr := dsq.ResultsWithEntriesChan(q, ch)
// qr := dsq.ResultsWithEntries(q, es)
func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
go func() {
defer close(ch)
var rnge *util.Range
if qrb.Query.Prefix != "" {
rnge = util.BytesPrefix([]byte(qrb.Query.Prefix))
}
i := d.DB.NewIterator(rnge, nil)
defer i.Release()
// offset
if q.Offset > 0 {
for j := 0; j < q.Offset; j++ {
i.Next()
}
// advance iterator for offset
if qrb.Query.Offset > 0 {
for j := 0; j < qrb.Query.Offset; j++ {
i.Next()
}
}
sent := 0
for i.Next() {
// limit
if q.Limit > 0 && sent >= q.Limit {
break
}
k := ds.NewKey(string(i.Key())).String()
e := dsq.Entry{Key: k}
// iterate, and handle limit, too
for sent := 0; i.Next(); sent++ {
// end early if we hit the limit
if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit {
break
}
if !q.KeysOnly {
buf := make([]byte, len(i.Value()))
copy(buf, i.Value())
e.Value = buf
}
k := ds.NewKey(string(i.Key())).String()
e := dsq.Entry{Key: k}
ch <- e
sent++
}
i.Release()
if err := i.Error(); err != nil {
qr.Err() <- err
if !qrb.Query.KeysOnly {
buf := make([]byte, len(i.Value()))
copy(buf, i.Value())
e.Value = buf
}
}()
// Now, apply remaining pieces.
q2 := q
q2.Offset = 0 // already applied
q2.Limit = 0 // already applied
select {
case qrb.Output <- dsq.Result{Entry: e}: // we sent it out
case <-worker.Closing(): // client told us to end early.
break
}
}
qr = q2.ApplyTo(qr)
qr.Query = q // set it back
return qr, nil
if err := i.Error(); err != nil {
select {
case qrb.Output <- dsq.Result{Error: err}: // client read our error
case <-worker.Closing(): // client told us to end.
return
}
}
}
// LevelDB needs to be closed.
......
......@@ -20,21 +20,28 @@ var testcases = map[string]string{
"/f": "f",
}
func TestQuery(t *testing.T) {
// returns datastore, and a function to call on exit.
// (this garbage collects). So:
//
// d, close := newDS(t)
// defer close()
func newDS(t *testing.T) (Datastore, func()) {
path, err := ioutil.TempDir("/tmp", "testing_leveldb_")
if err != nil {
t.Fatal(err)
}
defer func() {
os.RemoveAll(path)
}()
d, err := NewDatastore(path, nil)
if err != nil {
t.Fatal(err)
}
defer d.Close()
return d, func() {
os.RemoveAll(path)
d.Close()
}
}
func addTestCases(t *testing.T, d Datastore, testcases map[string]string) {
for k, v := range testcases {
dsk := ds.NewKey(k)
if err := d.Put(dsk, []byte(v)); err != nil {
......@@ -54,6 +61,13 @@ func TestQuery(t *testing.T) {
}
}
}
func TestQuery(t *testing.T) {
d, close := newDS(t)
defer close()
addTestCases(t, d, testcases)
rs, err := d.Query(dsq.Query{Prefix: "/a/"})
if err != nil {
t.Fatal(err)
......@@ -65,7 +79,7 @@ func TestQuery(t *testing.T) {
"/a/b/d",
"/a/c",
"/a/d",
}, rs.AllEntries())
}, rs)
// test offset and limit
......@@ -77,11 +91,22 @@ func TestQuery(t *testing.T) {
expectMatches(t, []string{
"/a/b/d",
"/a/c",
}, rs.AllEntries())
}, rs)
}
func TestQueryRespectsProcess(t *testing.T) {
d, close := newDS(t)
defer close()
addTestCases(t, d, testcases)
}
func expectMatches(t *testing.T, expect []string, actual []dsq.Entry) {
func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
actual, err := actualR.Rest()
if err != nil {
t.Error(err)
}
if len(actual) != len(expect) {
t.Error("not enough", expect, actual)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment