Commit a1d868ee authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #66 from ipfs/feat/mountquery

Implemented cross-ds query in mount datastore
parents 18063741 af1c76e6
1.2.0: QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364
1.2.1: QmSiN66ybp5udnQnvhb6euiWiiQWdGvwMhAWa95cC1DTCV
language: go
go:
- 1.7.3
- 1.8
- tip
script:
......
......@@ -5,10 +5,10 @@ package mount
import (
"errors"
"io"
"sort"
"strings"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/keytransform"
"github.com/ipfs/go-datastore/query"
)
......@@ -27,6 +27,7 @@ func New(mounts []Mount) *Datastore {
for i, v := range mounts {
m[i] = v
}
sort.Slice(m, func(i, j int) bool { return m[i].Prefix.String() > m[j].Prefix.String() })
return &Datastore{mounts: m}
}
......@@ -47,6 +48,38 @@ func (d *Datastore) lookup(key datastore.Key) (ds datastore.Datastore, mountpoin
return nil, datastore.NewKey("/"), key
}
// lookupAll returns all mounts that might contain keys that are descendant of <key>
//
// Matching: /ao/e
//
// / B /ao/e
// /a/ not matching
// /ao/ B /e
// /ao/e/ A /
// /ao/e/uh/ A /
// /aoe/ not matching
func (d *Datastore) lookupAll(key datastore.Key) (ds []datastore.Datastore, mountpoint, rest []datastore.Key) {
for _, m := range d.mounts {
p := m.Prefix.String()
if len(p) > 1 {
p = p + "/"
}
if strings.HasPrefix(p, key.String()) {
ds = append(ds, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, datastore.NewKey("/"))
} else if strings.HasPrefix(key.String(), p) {
r := strings.TrimPrefix(key.String(), m.Prefix.String())
ds = append(ds, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, datastore.NewKey(r))
}
}
return ds, mountpoint, rest
}
func (d *Datastore) Put(key datastore.Key, value interface{}) error {
ds, _, k := d.lookup(key)
if ds == nil {
......@@ -84,36 +117,65 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
// TODO this is overly simplistic, but the only caller is
// `ipfs refs local` for now, and this gets us moving.
// TODO this is still overly simplistic, but the only callers are
// `ipfs refs local` and ipfs-ds-convert.
return nil, errors.New("mount only supports listing all prefixed keys in random order")
}
key := datastore.NewKey(q.Prefix)
ds, mount, k := d.lookup(key)
prefix := datastore.NewKey(q.Prefix)
dses, mounts, rests := d.lookupAll(prefix)
// current itorator state
var res query.Results
var ds datastore.Datastore
var mount datastore.Key
var rest datastore.Key
i := 0
return query.ResultsFromIterator(q, query.Iterator{
Next: func() (query.Result, bool) {
var r query.Result
var more bool
for try := true; try; try = len(dses) > i {
if ds == nil {
return nil, errors.New("mount only supports listing a mount point")
if len(dses) <= i {
//This should not happen normally
return query.Result{}, false
}
// TODO support listing cross mount points too
// delegate the query to the mounted datastore, while adjusting
// keys in and out
q2 := q
q2.Prefix = k.String()
wrapDS := keytransform.Wrap(ds, &keytransform.Pair{
Convert: func(datastore.Key) datastore.Key {
panic("this should never be called")
},
Invert: func(k datastore.Key) datastore.Key {
return mount.Child(k)
},
})
ds = dses[i]
mount = mounts[i]
rest = rests[i]
r, err := wrapDS.Query(q2)
q2 := q
q2.Prefix = rest.String()
r, err := ds.Query(q2)
if err != nil {
return nil, err
return query.Result{Error: err}, false
}
res = r
}
r, more = res.NextSync()
if !more {
ds = nil
i++
more = len(dses) > i
} else {
break
}
r = query.ResultsReplaceQuery(r, q)
return r, nil
}
r.Key = mount.Child(datastore.RawKey(r.Key)).String()
return r, more
},
Close: func() error {
if len(mounts) > i {
return res.Close()
}
return nil
},
}), nil
}
func (d *Datastore) Close() error {
......
......@@ -239,3 +239,94 @@ func TestQuerySimple(t *testing.T) {
t.Errorf("did not see wanted key %q in %+v", myKey, entries)
}
}
func TestQueryCross(t *testing.T) {
mapds0 := datastore.NewMapDatastore()
mapds1 := datastore.NewMapDatastore()
mapds2 := datastore.NewMapDatastore()
mapds3 := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/foo"), Datastore: mapds1},
{Prefix: datastore.NewKey("/bar"), Datastore: mapds2},
{Prefix: datastore.NewKey("/baz"), Datastore: mapds3},
{Prefix: datastore.NewKey("/"), Datastore: mapds0},
})
m.Put(datastore.NewKey("/foo/lorem"), "123")
m.Put(datastore.NewKey("/bar/ipsum"), "234")
m.Put(datastore.NewKey("/bar/dolor"), "345")
m.Put(datastore.NewKey("/baz/sit"), "456")
m.Put(datastore.NewKey("/banana"), "567")
res, err := m.Query(query.Query{Prefix: "/ba"})
if err != nil {
t.Fatalf("Query fail: %v\n", err)
}
entries, err := res.Rest()
if err != nil {
t.Fatalf("Query Results.Rest fail: %v\n", err)
}
seen := 0
expect := map[string]string{
"/foo/lorem": "y u here",
"/bar/ipsum": "234",
"/bar/dolor": "345",
"/baz/sit": "456",
"/banana": "567",
}
for _, e := range entries {
v := expect[e.Key]
if v == "" {
t.Errorf("unexpected key %s", e.Key)
}
if v != e.Value {
t.Errorf("key value didn't match expected %s: '%s' - '%s'", e.Key, v, e.Value)
}
expect[e.Key] = "seen"
seen++
}
if seen != 4 {
t.Errorf("expected to see 3 values, saw %d", seen)
}
}
func TestLookupPrio(t *testing.T) {
mapds0 := datastore.NewMapDatastore()
mapds1 := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/"), Datastore: mapds0},
{Prefix: datastore.NewKey("/foo"), Datastore: mapds1},
})
m.Put(datastore.NewKey("/foo/bar"), "123")
m.Put(datastore.NewKey("/baz"), "234")
found, err := mapds0.Has(datastore.NewKey("/baz"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, true; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
found, err = mapds0.Has(datastore.NewKey("/foo/bar"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, false; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
found, err = mapds1.Has(datastore.NewKey("/bar"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, true; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
}
......@@ -25,6 +25,6 @@
"license": "MIT",
"name": "go-datastore",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "1.2.0"
"version": "1.2.1"
}
......@@ -5,11 +5,11 @@ package syncmount
import (
"errors"
"io"
"sort"
"strings"
"sync"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/keytransform"
"github.com/ipfs/go-datastore/query"
)
......@@ -28,6 +28,7 @@ func New(mounts []Mount) *Datastore {
for i, v := range mounts {
m[i] = v
}
sort.Slice(m, func(i, j int) bool { return m[i].Prefix.String() > m[j].Prefix.String() })
return &Datastore{mounts: m}
}
......@@ -51,6 +52,41 @@ func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
return nil, ds.NewKey("/"), key
}
// lookupAll returns all mounts that might contain keys that are descendant of <key>
//
// Matching: /ao/e
//
// / B /ao/e
// /a/ not matching
// /ao/ B /e
// /ao/e/ A /
// /ao/e/uh/ A /
// /aoe/ not matching
func (d *Datastore) lookupAll(key ds.Key) (dst []ds.Datastore, mountpoint, rest []ds.Key) {
d.lk.Lock()
defer d.lk.Unlock()
for _, m := range d.mounts {
p := m.Prefix.String()
if len(p) > 1 {
p = p + "/"
}
if strings.HasPrefix(p, key.String()) {
dst = append(dst, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, ds.NewKey("/"))
} else if strings.HasPrefix(key.String(), p) {
r := strings.TrimPrefix(key.String(), m.Prefix.String())
dst = append(dst, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, ds.NewKey(r))
}
}
return dst, mountpoint, rest
}
func (d *Datastore) Put(key ds.Key, value interface{}) error {
cds, _, k := d.lookup(key)
if cds == nil {
......@@ -88,36 +124,65 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
// TODO this is overly simplistic, but the only caller is
// `ipfs refs local` for now, and this gets us moving.
// TODO this is still overly simplistic, but the only callers are
// `ipfs refs local` and ipfs-ds-convert.
return nil, errors.New("mount only supports listing all prefixed keys in random order")
}
key := ds.NewKey(q.Prefix)
cds, mount, k := d.lookup(key)
if cds == nil {
return nil, errors.New("mount only supports listing a mount point")
prefix := ds.NewKey(q.Prefix)
dses, mounts, rests := d.lookupAll(prefix)
// current itorator state
var res query.Results
var dst ds.Datastore
var mount ds.Key
var rest ds.Key
i := 0
return query.ResultsFromIterator(q, query.Iterator{
Next: func() (query.Result, bool) {
var r query.Result
var more bool
for try := true; try; try = len(dses) > i {
if dst == nil {
if len(dses) <= i {
//This should not happen normally
return query.Result{}, false
}
// TODO support listing cross mount points too
// delegate the query to the mounted datastore, while adjusting
// keys in and out
q2 := q
q2.Prefix = k.String()
wrapDS := keytransform.Wrap(cds, &keytransform.Pair{
Convert: func(ds.Key) ds.Key {
panic("this should never be called")
},
Invert: func(k ds.Key) ds.Key {
return mount.Child(k)
},
})
dst = dses[i]
mount = mounts[i]
rest = rests[i]
r, err := wrapDS.Query(q2)
q2 := q
q2.Prefix = rest.String()
r, err := dst.Query(q2)
if err != nil {
return nil, err
return query.Result{Error: err}, false
}
res = r
}
r, more = res.NextSync()
if !more {
dst = nil
i++
more = len(dses) > i
} else {
break
}
}
r = query.ResultsReplaceQuery(r, q)
return r, nil
r.Key = mount.Child(ds.RawKey(r.Key)).String()
return r, more
},
Close: func() error {
if len(mounts) > i {
return res.Close()
}
return nil
},
}), nil
}
func (d *Datastore) IsThreadSafe() {}
......
......@@ -4,8 +4,8 @@ import (
"testing"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/mount"
"github.com/ipfs/go-datastore/query"
mount "github.com/ipfs/go-datastore/syncmount"
)
func TestPutBadNothing(t *testing.T) {
......@@ -239,3 +239,94 @@ func TestQuerySimple(t *testing.T) {
t.Errorf("did not see wanted key %q in %+v", myKey, entries)
}
}
func TestQueryCross(t *testing.T) {
mapds0 := datastore.NewMapDatastore()
mapds1 := datastore.NewMapDatastore()
mapds2 := datastore.NewMapDatastore()
mapds3 := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/foo"), Datastore: mapds1},
{Prefix: datastore.NewKey("/bar"), Datastore: mapds2},
{Prefix: datastore.NewKey("/baz"), Datastore: mapds3},
{Prefix: datastore.NewKey("/"), Datastore: mapds0},
})
m.Put(datastore.NewKey("/foo/lorem"), "123")
m.Put(datastore.NewKey("/bar/ipsum"), "234")
m.Put(datastore.NewKey("/bar/dolor"), "345")
m.Put(datastore.NewKey("/baz/sit"), "456")
m.Put(datastore.NewKey("/banana"), "567")
res, err := m.Query(query.Query{Prefix: "/ba"})
if err != nil {
t.Fatalf("Query fail: %v\n", err)
}
entries, err := res.Rest()
if err != nil {
t.Fatalf("Query Results.Rest fail: %v\n", err)
}
seen := 0
expect := map[string]string{
"/foo/lorem": "y u here",
"/bar/ipsum": "234",
"/bar/dolor": "345",
"/baz/sit": "456",
"/banana": "567",
}
for _, e := range entries {
v := expect[e.Key]
if v == "" {
t.Errorf("unexpected key %s", e.Key)
}
if v != e.Value {
t.Errorf("key value didn't match expected %s: '%s' - '%s'", e.Key, v, e.Value)
}
expect[e.Key] = "seen"
seen++
}
if seen != 4 {
t.Errorf("expected to see 3 values, saw %d", seen)
}
}
func TestLookupPrio(t *testing.T) {
mapds0 := datastore.NewMapDatastore()
mapds1 := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/"), Datastore: mapds0},
{Prefix: datastore.NewKey("/foo"), Datastore: mapds1},
})
m.Put(datastore.NewKey("/foo/bar"), "123")
m.Put(datastore.NewKey("/baz"), "234")
found, err := mapds0.Has(datastore.NewKey("/baz"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, true; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
found, err = mapds0.Has(datastore.NewKey("/foo/bar"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, false; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
found, err = mapds1.Has(datastore.NewKey("/bar"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, true; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment