Commit 083e0167 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

updated datastore for proper query handling

Queries now can be cancelled and the resources collected
parent 8ecb97e3
...@@ -5,6 +5,7 @@ package blockstore ...@@ -5,6 +5,7 @@ package blockstore
import ( import (
"errors" "errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsns "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace" dsns "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
...@@ -12,8 +13,11 @@ import ( ...@@ -12,8 +13,11 @@ import (
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
) )
var log = eventlog.Logger("blockstore")
// BlockPrefix namespaces blockstore datastores // BlockPrefix namespaces blockstore datastores
var BlockPrefix = ds.NewKey("blocks") var BlockPrefix = ds.NewKey("blocks")
...@@ -27,7 +31,9 @@ type Blockstore interface { ...@@ -27,7 +31,9 @@ type Blockstore interface {
Has(u.Key) (bool, error) Has(u.Key) (bool, error)
Get(u.Key) (*blocks.Block, error) Get(u.Key) (*blocks.Block, error)
Put(*blocks.Block) error Put(*blocks.Block) error
AllKeys(offset int, limit int) ([]u.Key, error)
AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error)
AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error)
} }
func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore { func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore {
...@@ -80,10 +86,29 @@ func (s *blockstore) DeleteBlock(k u.Key) error { ...@@ -80,10 +86,29 @@ func (s *blockstore) DeleteBlock(k u.Key) error {
// AllKeys runs a query for keys from the blockstore. // AllKeys runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param? // this is very simplistic, in the future, take dsq.Query as a param?
// if offset and limit are 0, they are ignored. // if offset and limit are 0, they are ignored.
func (bs *blockstore) AllKeys(offset int, limit int) ([]u.Key, error) { //
// AllKeys respects context
func (bs *blockstore) AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error) {
ch, err := bs.AllKeysChan(ctx, offset, limit)
if err != nil {
return nil, err
}
var keys []u.Key var keys []u.Key
for k := range ch {
keys = append(keys, k)
}
return keys, nil
}
// AllKeys runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param?
// if offset and limit are 0, they are ignored.
//
// AllKeys respects context
func (bs *blockstore) AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) {
// TODO make async inside ds/leveldb.Query
// KeysOnly, because that would be _a lot_ of data. // KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true, Offset: offset, Limit: limit} q := dsq.Query{KeysOnly: true, Offset: offset, Limit: limit}
res, err := bs.datastore.Query(q) res, err := bs.datastore.Query(q)
...@@ -91,10 +116,46 @@ func (bs *blockstore) AllKeys(offset int, limit int) ([]u.Key, error) { ...@@ -91,10 +116,46 @@ func (bs *blockstore) AllKeys(offset int, limit int) ([]u.Key, error) {
return nil, err return nil, err
} }
for e := range res.Entries() { // this function is here to compartmentalize
get := func() (k u.Key, ok bool) {
select {
case <-ctx.Done():
return k, false
case e, more := <-res.Next():
if !more {
return k, false
}
if e.Error != nil {
log.Debug("blockstore.AllKeysChan got err:", e.Error)
return k, false
}
// need to convert to u.Key using u.KeyFromDsKey. // need to convert to u.Key using u.KeyFromDsKey.
k := u.KeyFromDsKey(ds.NewKey(e.Key)) k = u.KeyFromDsKey(ds.NewKey(e.Key))
keys = append(keys, k) return k, true
} }
return keys, nil }
output := make(chan u.Key)
go func() {
defer func() {
res.Process().Close() // ensure exit (signals early exit, too)
close(output)
}()
for {
k, ok := get()
if !ok {
return
}
select {
case <-ctx.Done():
return
case output <- k:
}
}
}()
return output, nil
} }
...@@ -5,8 +5,11 @@ import ( ...@@ -5,8 +5,11 @@ import (
"fmt" "fmt"
"testing" "testing"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
...@@ -42,9 +45,11 @@ func TestPutThenGetBlock(t *testing.T) { ...@@ -42,9 +45,11 @@ func TestPutThenGetBlock(t *testing.T) {
} }
} }
func TestAllKeys(t *testing.T) { func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []u.Key) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) if d == nil {
N := 100 d = ds.NewMapDatastore()
}
bs := NewBlockstore(ds_sync.MutexWrap(d))
keys := make([]u.Key, N) keys := make([]u.Key, N)
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
...@@ -55,8 +60,14 @@ func TestAllKeys(t *testing.T) { ...@@ -55,8 +60,14 @@ func TestAllKeys(t *testing.T) {
} }
keys[i] = block.Key() keys[i] = block.Key()
} }
return bs, keys
}
func TestAllKeysSimple(t *testing.T) {
bs, keys := newBlockStoreWithKeys(t, nil, 100)
keys2, err := bs.AllKeys(0, 0) ctx := context.Background()
keys2, err := bs.AllKeys(ctx, 0, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -65,8 +76,14 @@ func TestAllKeys(t *testing.T) { ...@@ -65,8 +76,14 @@ func TestAllKeys(t *testing.T) {
// } // }
expectMatches(t, keys, keys2) expectMatches(t, keys, keys2)
}
keys3, err := bs.AllKeys(N/3, N/3) func TestAllKeysOffsetAndLimit(t *testing.T) {
N := 30
bs, _ := newBlockStoreWithKeys(t, nil, N)
ctx := context.Background()
keys3, err := bs.AllKeys(ctx, N/3, N/3)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -76,6 +93,114 @@ func TestAllKeys(t *testing.T) { ...@@ -76,6 +93,114 @@ func TestAllKeys(t *testing.T) {
if len(keys3) != N/3 { if len(keys3) != N/3 {
t.Errorf("keys3 should be: %d != %d", N/3, len(keys3)) t.Errorf("keys3 should be: %d != %d", N/3, len(keys3))
} }
}
func TestAllKeysRespectsContext(t *testing.T) {
N := 100
d := &queryTestDS{ds: ds.NewMapDatastore()}
bs, _ := newBlockStoreWithKeys(t, d, N)
started := make(chan struct{}, 1)
done := make(chan struct{}, 1)
errors := make(chan error, 100)
getKeys := func(ctx context.Context) {
started <- struct{}{}
_, err := bs.AllKeys(ctx, 0, 0) // once without cancelling
if err != nil {
errors <- err
}
done <- struct{}{}
errors <- nil // a nil one to signal break
}
// Once without context, to make sure it all works
{
var results dsq.Results
resultChan := make(chan dsq.Result)
d.SetFunc(func(q dsq.Query) (dsq.Results, error) {
results = dsq.ResultsWithChan(q, resultChan)
return results, nil
})
go getKeys(context.Background())
// make sure it's waiting.
<-started
select {
case <-done:
t.Fatal("sync is wrong")
case <-results.Process().Closing():
t.Fatal("should not be closing")
case <-results.Process().Closed():
t.Fatal("should not be closed")
default:
}
e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()}
resultChan <- dsq.Result{Entry: e} // let it go.
close(resultChan)
<-done // should be done now.
<-results.Process().Closed() // should be closed now
// print any errors
for err := range errors {
if err == nil {
break
}
t.Error(err)
}
}
// Once with
{
var results dsq.Results
resultChan := make(chan dsq.Result)
d.SetFunc(func(q dsq.Query) (dsq.Results, error) {
results = dsq.ResultsWithChan(q, resultChan)
return results, nil
})
ctx, cancel := context.WithCancel(context.Background())
go getKeys(ctx)
// make sure it's waiting.
<-started
select {
case <-done:
t.Fatal("sync is wrong")
case <-results.Process().Closing():
t.Fatal("should not be closing")
case <-results.Process().Closed():
t.Fatal("should not be closed")
default:
}
cancel() // let it go.
select {
case <-done:
t.Fatal("sync is wrong")
case <-results.Process().Closed():
t.Fatal("should not be closed") // should not be closed yet.
case <-results.Process().Closing():
// should be closing now!
t.Log("closing correctly at this point.")
}
close(resultChan)
<-done // should be done now.
<-results.Process().Closed() // should be closed now
// print any errors
for err := range errors {
if err == nil {
break
}
t.Error(err)
}
}
} }
...@@ -111,3 +236,33 @@ func expectMatches(t *testing.T, expect, actual []u.Key) { ...@@ -111,3 +236,33 @@ func expectMatches(t *testing.T, expect, actual []u.Key) {
} }
} }
} }
type queryTestDS struct {
cb func(q dsq.Query) (dsq.Results, error)
ds ds.Datastore
}
func (c *queryTestDS) SetFunc(f func(dsq.Query) (dsq.Results, error)) { c.cb = f }
func (c *queryTestDS) Put(key ds.Key, value interface{}) (err error) {
return c.ds.Put(key, value)
}
func (c *queryTestDS) Get(key ds.Key) (value interface{}, err error) {
return c.ds.Get(key)
}
func (c *queryTestDS) Has(key ds.Key) (exists bool, err error) {
return c.ds.Has(key)
}
func (c *queryTestDS) Delete(key ds.Key) (err error) {
return c.ds.Delete(key)
}
func (c *queryTestDS) Query(q dsq.Query) (dsq.Results, error) {
if c.cb != nil {
return c.cb(q)
}
return c.ds.Query(q)
}
package blockstore package blockstore
import ( import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/golang-lru" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/golang-lru"
"github.com/jbenet/go-ipfs/blocks" "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
...@@ -44,6 +46,10 @@ func (w *writecache) Put(b *blocks.Block) error { ...@@ -44,6 +46,10 @@ func (w *writecache) Put(b *blocks.Block) error {
return w.blockstore.Put(b) return w.blockstore.Put(b)
} }
func (w *writecache) AllKeys(offset int, limit int) ([]u.Key, error) { func (w *writecache) AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error) {
return w.blockstore.AllKeys(offset, limit) return w.blockstore.AllKeys(ctx, offset, limit)
}
func (w *writecache) AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) {
return w.blockstore.AllKeysChan(ctx, offset, limit)
} }
...@@ -84,7 +84,7 @@ func (c *callbackDatastore) Delete(key ds.Key) (err error) { ...@@ -84,7 +84,7 @@ func (c *callbackDatastore) Delete(key ds.Key) (err error) {
return c.ds.Delete(key) return c.ds.Delete(key)
} }
func (c *callbackDatastore) Query(q dsq.Query) (*dsq.Results, error) { func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) {
c.f() c.f()
return c.ds.Query(q) return c.ds.Query(q)
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment