Commit 882341c7 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

query: send back errors when channels somehow

this is not a good inteface. should fix it
parent a706e049
...@@ -79,44 +79,53 @@ func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) { ...@@ -79,44 +79,53 @@ func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) {
} }
i := d.DB.NewIterator(rnge, nil) i := d.DB.NewIterator(rnge, nil)
// offset // buffer this channel so that we dont totally block leveldb if client
if q.Offset > 0 { // is not reading from chan.
for j := 0; j < q.Offset; j++ { ch := make(chan dsq.Entry, 1000)
i.Next() qr := dsq.ResultsWithEntriesChan(q, ch)
// qr := dsq.ResultsWithEntries(q, es)
go func() {
defer close(ch)
// offset
if q.Offset > 0 {
for j := 0; j < q.Offset; j++ {
i.Next()
}
} }
}
var es []dsq.Entry sent := 0
for i.Next() { for i.Next() {
// limit // limit
if q.Limit > 0 && len(es) >= q.Limit { if q.Limit > 0 && sent >= q.Limit {
break break
} }
k := ds.NewKey(string(i.Key())).String() k := ds.NewKey(string(i.Key())).String()
e := dsq.Entry{Key: k} e := dsq.Entry{Key: k}
if !q.KeysOnly { if !q.KeysOnly {
buf := make([]byte, len(i.Value())) buf := make([]byte, len(i.Value()))
copy(buf, i.Value()) copy(buf, i.Value())
e.Value = buf e.Value = buf
} }
es = append(es, e) ch <- e
} sent++
i.Release() }
if err := i.Error(); err != nil { i.Release()
return nil, err if err := i.Error(); err != nil {
} qr.Err() <- err
}
}()
// Now, apply remaining pieces. // Now, apply remaining pieces.
q2 := q q2 := q
q2.Offset = 0 // already applied q2.Offset = 0 // already applied
q2.Limit = 0 // already applied q2.Limit = 0 // already applied
// TODO: make this async with:
// qr := dsq.ResultsWithEntriesChan(q, ch)
qr := dsq.ResultsWithEntries(q, es)
qr = q2.ApplyTo(qr) qr = q2.ApplyTo(qr)
qr.Query = q // set it back qr.Query = q // set it back
return qr, nil return qr, nil
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment