diff --git a/batch.go b/batch.go index 2917b2785b73bc8adcddd223aafa84ad63055e8f..57880dd2d39ef769ddd1d62dc3b7f8ab4ceb8cf5 100644 --- a/batch.go +++ b/batch.go @@ -1,7 +1,5 @@ package datastore -type verb int - type op struct { delete bool value []byte diff --git a/key_test.go b/key_test.go index 9c7e05d15631d9827d305675bf3a4797c32ec134..aeeb2848dbd95b669edb5316acd155b49634cdc8 100644 --- a/key_test.go +++ b/key_test.go @@ -2,7 +2,6 @@ package datastore_test import ( "bytes" - "math/rand" "path" "strings" "testing" @@ -14,16 +13,6 @@ import ( // Hook up gocheck into the "go test" runner. func Test(t *testing.T) { TestingT(t) } -func randomString() string { - chars := "abcdefghijklmnopqrstuvwxyz1234567890" - var buf bytes.Buffer - l := rand.Intn(50) - for j := 0; j < l; j++ { - buf.WriteByte(chars[rand.Intn(len(chars))]) - } - return buf.String() -} - type KeySuite struct{} var _ = Suite(&KeySuite{}) diff --git a/query/filter_test.go b/query/filter_test.go index b4c7d8ac49c6180ef6a2db34f8a67f298d8bce64..55919166e77858fa7707c256be66610a7ac966e5 100644 --- a/query/filter_test.go +++ b/query/filter_test.go @@ -5,12 +5,6 @@ import ( "testing" ) -type filterTestCase struct { - filter Filter - keys []string - expect []string -} - func testKeyFilter(t *testing.T, f Filter, keys []string, expect []string) { e := make([]Entry, len(keys)) for i, k := range keys { diff --git a/query/order_test.go b/query/order_test.go index 648304172fde197af841bb74f93920eb5c5cabfe..5012c13cc4b6d0e8f470433b3b6aaf3b5cf3850c 100644 --- a/query/order_test.go +++ b/query/order_test.go @@ -5,12 +5,6 @@ import ( "testing" ) -type orderTestCase struct { - order Order - keys []string - expect []string -} - func testKeyOrder(t *testing.T, f Order, keys []string, expect []string) { e := make([]Entry, len(keys)) for i, k := range keys { diff --git a/query/query.go b/query/query.go index 691fbce4c9265cfa4012b00ef2b6bfe419b47bd8..fa206da9a00cd34903472391b1c451d85ded210d 100644 --- a/query/query.go +++ b/query/query.go @@ -75,7 +75,8 @@ type Entry struct { } // Result is a special entry that includes an error, so that the client -// may be warned about internal errors. +// may be warned about internal errors. If Error is non-nil, Entry must be +// empty. type Result struct { Entry @@ -203,12 +204,12 @@ func NewResultBuilder(q Query) *ResultBuilder { } // ResultsWithChan returns a Results object from a channel -// of Result entries. Respects its own Close() +// of Result entries. +// +// DEPRECATED: This iterator is impossible to cancel correctly. Canceling it +// will leave anything trying to write to the result channel hanging. func ResultsWithChan(q Query, res <-chan Result) Results { - b := NewResultBuilder(q) - - // go consume all the entries and add them to the results. - b.Process.Go(func(worker goprocess.Process) { + return ResultsWithProcess(q, func(worker goprocess.Process, out chan<- Result) { for { select { case <-worker.Closing(): // client told us to close early @@ -219,13 +220,24 @@ func ResultsWithChan(q Query, res <-chan Result) Results { } select { - case b.Output <- e: + case out <- e: case <-worker.Closing(): // client told us to close early return } } } }) +} + +// ResultsWithProcess returns a Results object with the results generated by the +// passed subprocess. +func ResultsWithProcess(q Query, proc func(goprocess.Process, chan<- Result)) Results { + b := NewResultBuilder(q) + + // go consume all the entries and add them to the results. + b.Process.Go(func(worker goprocess.Process) { + proc(worker, b.Output) + }) go b.Process.CloseAfterChildren() return b.Results() diff --git a/query/query_impl.go b/query/query_impl.go index 440513ce94cbd718134f5dbc1aeeaa499b313f6a..863ab81f6239a57243a3eb54ef0d82b9eb762993 100644 --- a/query/query_impl.go +++ b/query/query_impl.go @@ -1,78 +1,79 @@ package query -import "sort" +import ( + "sort" -func DerivedResults(qr Results, ch <-chan Result) Results { - return &results{ - query: qr.Query(), - proc: qr.Process(), - res: ch, - } -} + goprocess "github.com/jbenet/goprocess" +) // NaiveFilter applies a filter to the results. func NaiveFilter(qr Results, filter Filter) Results { - ch := make(chan Result) - go func() { - defer close(ch) - defer qr.Close() - - for e := range qr.Next() { - if e.Error != nil || filter.Filter(e.Entry) { - ch <- e + return ResultsFromIterator(qr.Query(), Iterator{ + Next: func() (Result, bool) { + for { + e, ok := qr.NextSync() + if !ok { + return Result{}, false + } + if e.Error != nil || filter.Filter(e.Entry) { + return e, true + } } - } - }() - - return ResultsWithChan(qr.Query(), ch) + }, + Close: func() error { + return qr.Close() + }, + }) } // NaiveLimit truncates the results to a given int limit func NaiveLimit(qr Results, limit int) Results { - ch := make(chan Result) - go func() { - defer close(ch) - defer qr.Close() - - l := 0 - for e := range qr.Next() { - if e.Error != nil { - ch <- e - continue + if limit == 0 { + // 0 means no limit + return qr + } + closed := false + return ResultsFromIterator(qr.Query(), Iterator{ + Next: func() (Result, bool) { + if limit == 0 { + if !closed { + closed = true + err := qr.Close() + if err != nil { + return Result{Error: err}, true + } + } + return Result{}, false } - ch <- e - l++ - if limit > 0 && l >= limit { - break + limit-- + return qr.NextSync() + }, + Close: func() error { + if closed { + return nil } - } - }() - - return ResultsWithChan(qr.Query(), ch) + closed = true + return qr.Close() + }, + }) } // NaiveOffset skips a given number of results func NaiveOffset(qr Results, offset int) Results { - ch := make(chan Result) - go func() { - defer close(ch) - defer qr.Close() - - sent := 0 - for e := range qr.Next() { - if e.Error != nil { - ch <- e - } - - if sent < offset { - sent++ - continue + return ResultsFromIterator(qr.Query(), Iterator{ + Next: func() (Result, bool) { + for ; offset > 0; offset-- { + res, ok := qr.NextSync() + if !ok || res.Error != nil { + return res, ok + } } - ch <- e - } - }() - - return ResultsWithChan(qr.Query(), ch) + return qr.NextSync() + }, + Close: func() error { + return qr.Close() + }, + }) } // NaiveOrder reorders results according to given orders. @@ -83,29 +84,37 @@ func NaiveOrder(qr Results, orders ...Order) Results { return qr } - ch := make(chan Result) - var entries []Entry - go func() { - defer close(ch) + return ResultsWithProcess(qr.Query(), func(worker goprocess.Process, out chan<- Result) { defer qr.Close() - - for e := range qr.Next() { - if e.Error != nil { - ch <- e + var entries []Entry + collect: + for { + select { + case <-worker.Closing(): + return + case e, ok := <-qr.Next(): + if !ok { + break collect + } + if e.Error != nil { + out <- e + continue + } + entries = append(entries, e.Entry) } - - entries = append(entries, e.Entry) } + sort.Slice(entries, func(i int, j int) bool { return Less(orders, entries[i], entries[j]) }) - for _, e := range entries { - ch <- Result{Entry: e} + select { + case <-worker.Closing(): + return + case out <- Result{Entry: e}: + } } - }() - - return DerivedResults(qr, ch) + }) } func NaiveQueryApply(q Query, qr Results) Results {