Commit 864d79d4 authored by Łukasz Magiera's avatar Łukasz Magiera

Implement cross datastore Query in mount DS

parent cd593edd
......@@ -9,7 +9,6 @@ import (
"strings"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/keytransform"
"github.com/ipfs/go-datastore/query"
)
......@@ -22,27 +21,13 @@ type Mount struct {
Datastore datastore.Datastore
}
type MountSlice []Mount
func (m MountSlice) Len() int {
return len(m)
}
func (m MountSlice) Less(i, j int) bool {
return m[i].Prefix.String() > m[j].Prefix.String()
}
func (m MountSlice) Swap(i, j int) {
m[i], m[j] = m[j], m[i]
}
func New(mounts []Mount) *Datastore {
// make a copy so we're sure it doesn't mutate
m := make([]Mount, len(mounts))
for i, v := range mounts {
m[i] = v
}
sort.Sort(MountSlice(m))
sort.Slice(m, func(i, j int) bool { return m[i].Prefix.String() > m[j].Prefix.String() })
return &Datastore{mounts: m}
}
......@@ -63,6 +48,38 @@ func (d *Datastore) lookup(key datastore.Key) (ds datastore.Datastore, mountpoin
return nil, datastore.NewKey("/"), key
}
// lookupAll returns all mounts that might contain keys that are descendant of <key>
//
// Matching: /ao/e
//
// / B /ao/e
// /a/ not matching
// /ao/ B /e
// /ao/e/ A /
// /ao/e/uh/ A /
// /aoe/ not matching
func (d *Datastore) lookupAll(key datastore.Key) (ds []datastore.Datastore, mountpoint, rest []datastore.Key) {
for _, m := range d.mounts {
p := m.Prefix.String()
if len(p) > 1 {
p = p + "/"
}
if strings.HasPrefix(p, key.String()) {
ds = append(ds, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, datastore.NewKey("/"))
} else if strings.HasPrefix(key.String(), p) {
r := strings.TrimPrefix(key.String(), m.Prefix.String())
ds = append(ds, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, datastore.NewKey(r))
}
}
return ds, mountpoint, rest
}
func (d *Datastore) Put(key datastore.Key, value interface{}) error {
ds, _, k := d.lookup(key)
if ds == nil {
......@@ -100,36 +117,65 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
// TODO this is overly simplistic, but the only caller is
// `ipfs refs local` for now, and this gets us moving.
// TODO this is still overly simplistic, but the only callers are
// `ipfs refs local` and ipfs-ds-convert.
return nil, errors.New("mount only supports listing all prefixed keys in random order")
}
key := datastore.NewKey(q.Prefix)
ds, mount, k := d.lookup(key)
if ds == nil {
return nil, errors.New("mount only supports listing a mount point")
}
// TODO support listing cross mount points too
// delegate the query to the mounted datastore, while adjusting
// keys in and out
q2 := q
q2.Prefix = k.String()
wrapDS := keytransform.Wrap(ds, &keytransform.Pair{
Convert: func(datastore.Key) datastore.Key {
panic("this should never be called")
prefix := datastore.NewKey(q.Prefix)
dses, mounts, rests := d.lookupAll(prefix)
// current itorator state
var res query.Results
var ds datastore.Datastore
var mount datastore.Key
var rest datastore.Key
i := 0
return query.ResultsFromIterator(q, query.Iterator{
Next: func() (query.Result, bool) {
var r query.Result
var more bool
for try := true; try; try = len(dses) > i {
if ds == nil {
if len(dses) <= i {
//This should not happen normally
return query.Result{}, false
}
ds = dses[i]
mount = mounts[i]
rest = rests[i]
q2 := q
q2.Prefix = rest.String()
r, err := ds.Query(q2)
if err != nil {
return query.Result{Error: err}, false
}
res = r
}
r, more = res.NextSync()
if !more {
ds = nil
i++
more = len(dses) > i
} else {
break
}
}
r.Key = mount.Child(datastore.RawKey(r.Key)).String()
return r, more
},
Invert: func(k datastore.Key) datastore.Key {
return mount.Child(k)
Close: func() error {
if len(mounts) > i {
return res.Close()
}
return nil
},
})
r, err := wrapDS.Query(q2)
if err != nil {
return nil, err
}
r = query.ResultsReplaceQuery(r, q)
return r, nil
}), nil
}
func (d *Datastore) Close() error {
......
......@@ -240,6 +240,60 @@ func TestQuerySimple(t *testing.T) {
}
}
func TestQueryCross(t *testing.T) {
mapds0 := datastore.NewMapDatastore()
mapds1 := datastore.NewMapDatastore()
mapds2 := datastore.NewMapDatastore()
mapds3 := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/foo"), Datastore: mapds1},
{Prefix: datastore.NewKey("/bar"), Datastore: mapds2},
{Prefix: datastore.NewKey("/baz"), Datastore: mapds3},
{Prefix: datastore.NewKey("/"), Datastore: mapds0},
})
m.Put(datastore.NewKey("/foo/lorem"), "123")
m.Put(datastore.NewKey("/bar/ipsum"), "234")
m.Put(datastore.NewKey("/bar/dolor"), "345")
m.Put(datastore.NewKey("/baz/sit"), "456")
m.Put(datastore.NewKey("/banana"), "567")
res, err := m.Query(query.Query{Prefix: "/ba"})
if err != nil {
t.Fatalf("Query fail: %v\n", err)
}
entries, err := res.Rest()
if err != nil {
t.Fatalf("Query Results.Rest fail: %v\n", err)
}
seen := 0
expect := map[string]string{
"/foo/lorem": "y u here",
"/bar/ipsum": "234",
"/bar/dolor": "345",
"/baz/sit": "456",
"/banana": "567",
}
for _, e := range entries {
v := expect[e.Key]
if v == "" {
t.Errorf("unexpected key %s", e.Key)
}
if v != e.Value {
t.Errorf("key value didn't match expected %s: '%s' - '%s'", e.Key, v, e.Value)
}
expect[e.Key] = "seen"
seen++
}
if seen != 4 {
t.Errorf("expected to see 3 values, saw %d", seen)
}
}
func TestLookupPrio(t *testing.T) {
mapds0 := datastore.NewMapDatastore()
mapds1 := datastore.NewMapDatastore()
......
......@@ -10,7 +10,6 @@ import (
"sync"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/keytransform"
"github.com/ipfs/go-datastore/query"
)
......@@ -23,27 +22,13 @@ type Mount struct {
Datastore ds.Datastore
}
type MountSlice []Mount
func (m MountSlice) Len() int {
return len(m)
}
func (m MountSlice) Less(i, j int) bool {
return m[i].Prefix.String() > m[j].Prefix.String()
}
func (m MountSlice) Swap(i, j int) {
m[i], m[j] = m[j], m[i]
}
func New(mounts []Mount) *Datastore {
// make a copy so we're sure it doesn't mutate
m := make([]Mount, len(mounts))
for i, v := range mounts {
m[i] = v
}
sort.Sort(MountSlice(m))
sort.Slice(m, func(i, j int) bool { return m[i].Prefix.String() > m[j].Prefix.String() })
return &Datastore{mounts: m}
}
......@@ -67,6 +52,41 @@ func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
return nil, ds.NewKey("/"), key
}
// lookupAll returns all mounts that might contain keys that are descendant of <key>
//
// Matching: /ao/e
//
// / B /ao/e
// /a/ not matching
// /ao/ B /e
// /ao/e/ A /
// /ao/e/uh/ A /
// /aoe/ not matching
func (d *Datastore) lookupAll(key ds.Key) (dst []ds.Datastore, mountpoint, rest []ds.Key) {
d.lk.Lock()
defer d.lk.Unlock()
for _, m := range d.mounts {
p := m.Prefix.String()
if len(p) > 1 {
p = p + "/"
}
if strings.HasPrefix(p, key.String()) {
dst = append(dst, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, ds.NewKey("/"))
} else if strings.HasPrefix(key.String(), p) {
r := strings.TrimPrefix(key.String(), m.Prefix.String())
dst = append(dst, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, ds.NewKey(r))
}
}
return dst, mountpoint, rest
}
func (d *Datastore) Put(key ds.Key, value interface{}) error {
cds, _, k := d.lookup(key)
if cds == nil {
......@@ -104,36 +124,65 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
// TODO this is overly simplistic, but the only caller is
// `ipfs refs local` for now, and this gets us moving.
// TODO this is still overly simplistic, but the only callers are
// `ipfs refs local` and ipfs-ds-convert.
return nil, errors.New("mount only supports listing all prefixed keys in random order")
}
key := ds.NewKey(q.Prefix)
cds, mount, k := d.lookup(key)
if cds == nil {
return nil, errors.New("mount only supports listing a mount point")
}
// TODO support listing cross mount points too
// delegate the query to the mounted datastore, while adjusting
// keys in and out
q2 := q
q2.Prefix = k.String()
wrapDS := keytransform.Wrap(cds, &keytransform.Pair{
Convert: func(ds.Key) ds.Key {
panic("this should never be called")
prefix := ds.NewKey(q.Prefix)
dses, mounts, rests := d.lookupAll(prefix)
// current itorator state
var res query.Results
var dst ds.Datastore
var mount ds.Key
var rest ds.Key
i := 0
return query.ResultsFromIterator(q, query.Iterator{
Next: func() (query.Result, bool) {
var r query.Result
var more bool
for try := true; try; try = len(dses) > i {
if dst == nil {
if len(dses) <= i {
//This should not happen normally
return query.Result{}, false
}
dst = dses[i]
mount = mounts[i]
rest = rests[i]
q2 := q
q2.Prefix = rest.String()
r, err := dst.Query(q2)
if err != nil {
return query.Result{Error: err}, false
}
res = r
}
r, more = res.NextSync()
if !more {
dst = nil
i++
more = len(dses) > i
} else {
break
}
}
r.Key = mount.Child(ds.RawKey(r.Key)).String()
return r, more
},
Invert: func(k ds.Key) ds.Key {
return mount.Child(k)
Close: func() error {
if len(mounts) > i {
return res.Close()
}
return nil
},
})
r, err := wrapDS.Query(q2)
if err != nil {
return nil, err
}
r = query.ResultsReplaceQuery(r, q)
return r, nil
}), nil
}
func (d *Datastore) IsThreadSafe() {}
......
......@@ -240,6 +240,60 @@ func TestQuerySimple(t *testing.T) {
}
}
func TestQueryCross(t *testing.T) {
mapds0 := datastore.NewMapDatastore()
mapds1 := datastore.NewMapDatastore()
mapds2 := datastore.NewMapDatastore()
mapds3 := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/foo"), Datastore: mapds1},
{Prefix: datastore.NewKey("/bar"), Datastore: mapds2},
{Prefix: datastore.NewKey("/baz"), Datastore: mapds3},
{Prefix: datastore.NewKey("/"), Datastore: mapds0},
})
m.Put(datastore.NewKey("/foo/lorem"), "123")
m.Put(datastore.NewKey("/bar/ipsum"), "234")
m.Put(datastore.NewKey("/bar/dolor"), "345")
m.Put(datastore.NewKey("/baz/sit"), "456")
m.Put(datastore.NewKey("/banana"), "567")
res, err := m.Query(query.Query{Prefix: "/ba"})
if err != nil {
t.Fatalf("Query fail: %v\n", err)
}
entries, err := res.Rest()
if err != nil {
t.Fatalf("Query Results.Rest fail: %v\n", err)
}
seen := 0
expect := map[string]string{
"/foo/lorem": "y u here",
"/bar/ipsum": "234",
"/bar/dolor": "345",
"/baz/sit": "456",
"/banana": "567",
}
for _, e := range entries {
v := expect[e.Key]
if v == "" {
t.Errorf("unexpected key %s", e.Key)
}
if v != e.Value {
t.Errorf("key value didn't match expected %s: '%s' - '%s'", e.Key, v, e.Value)
}
expect[e.Key] = "seen"
seen++
}
if seen != 4 {
t.Errorf("expected to see 3 values, saw %d", seen)
}
}
func TestLookupPrio(t *testing.T) {
mapds0 := datastore.NewMapDatastore()
mapds1 := datastore.NewMapDatastore()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment