Commit 5598edf1 authored by Steven Allen's avatar Steven Allen

fix(key): only count a key as an ancestor or prefix if there is a separator

Also make sure to clean and normalize keys before using them as prefixes.

BREAKING CHANGES:

* `myds.Query(Query{Prefix:"/foo"})` will no longer match "/foobar" (or even
  "/foo"). This is usually what the user expects, we had a tendency to normalize
  "/foo/" to "/foo" (when we clean keys), and many datastores can't efficiently
  search for prefixes that aren't on path-boundaries anyways.
* Given a mount datastore with mounts `["/foo", "/"]`, `myds.Put("/foo", "bar")`
  will put the value to the datastore mounted at "/", not "/foo", as the key "/"
  and "" usually doesn't make sense.

While technically breaking, these changes are much more likely to fix bugs than
they are to break things.
parent e9f6023a
...@@ -212,20 +212,27 @@ func (k Key) ChildString(s string) Key { ...@@ -212,20 +212,27 @@ func (k Key) ChildString(s string) Key {
// NewKey("/Comedy").IsAncestorOf("/Comedy/MontyPython") // NewKey("/Comedy").IsAncestorOf("/Comedy/MontyPython")
// true // true
func (k Key) IsAncestorOf(other Key) bool { func (k Key) IsAncestorOf(other Key) bool {
if other.string == k.string { // equivalent to HasPrefix(other, k.string + "/")
if len(other.string) <= len(k.string) {
// We're not long enough to be a child.
return false return false
} }
return strings.HasPrefix(other.string, k.string)
if k.string == "/" {
// We're the root and the other key is longer.
return true
}
// "other" starts with /k.string/
return other.string[len(k.string)] == '/' && other.string[:len(k.string)] == k.string
} }
// IsDescendantOf returns whether this key contains another as a prefix. // IsDescendantOf returns whether this key contains another as a prefix.
// NewKey("/Comedy/MontyPython").IsDescendantOf("/Comedy") // NewKey("/Comedy/MontyPython").IsDescendantOf("/Comedy")
// true // true
func (k Key) IsDescendantOf(other Key) bool { func (k Key) IsDescendantOf(other Key) bool {
if other.string == k.string { return other.IsAncestorOf(k)
return false
}
return strings.HasPrefix(k.string, other.string)
} }
// IsTopLevel returns whether this key has only one namespace. // IsTopLevel returns whether this key has only one namespace.
......
...@@ -84,19 +84,23 @@ func CheckTrue(c *C, cond bool) { ...@@ -84,19 +84,23 @@ func CheckTrue(c *C, cond bool) {
func (ks *KeySuite) TestKeyAncestry(c *C) { func (ks *KeySuite) TestKeyAncestry(c *C) {
k1 := NewKey("/A/B/C") k1 := NewKey("/A/B/C")
k2 := NewKey("/A/B/C/D") k2 := NewKey("/A/B/C/D")
k3 := NewKey("/AB")
k4 := NewKey("/A")
c.Check(k1.String(), Equals, "/A/B/C") c.Check(k1.String(), Equals, "/A/B/C")
c.Check(k2.String(), Equals, "/A/B/C/D") c.Check(k2.String(), Equals, "/A/B/C/D")
CheckTrue(c, k1.IsAncestorOf(k2)) CheckTrue(c, k1.IsAncestorOf(k2))
CheckTrue(c, k2.IsDescendantOf(k1)) CheckTrue(c, k2.IsDescendantOf(k1))
CheckTrue(c, NewKey("/A").IsAncestorOf(k2)) CheckTrue(c, k4.IsAncestorOf(k2))
CheckTrue(c, NewKey("/A").IsAncestorOf(k1)) CheckTrue(c, k4.IsAncestorOf(k1))
CheckTrue(c, !NewKey("/A").IsDescendantOf(k2)) CheckTrue(c, !k4.IsDescendantOf(k2))
CheckTrue(c, !NewKey("/A").IsDescendantOf(k1)) CheckTrue(c, !k4.IsDescendantOf(k1))
CheckTrue(c, k2.IsDescendantOf(NewKey("/A"))) CheckTrue(c, !k3.IsDescendantOf(k4))
CheckTrue(c, k1.IsDescendantOf(NewKey("/A"))) CheckTrue(c, !k4.IsAncestorOf(k3))
CheckTrue(c, !k2.IsAncestorOf(NewKey("/A"))) CheckTrue(c, k2.IsDescendantOf(k4))
CheckTrue(c, !k1.IsAncestorOf(NewKey("/A"))) CheckTrue(c, k1.IsDescendantOf(k4))
CheckTrue(c, !k2.IsAncestorOf(k4))
CheckTrue(c, !k1.IsAncestorOf(k4))
CheckTrue(c, !k2.IsAncestorOf(k2)) CheckTrue(c, !k2.IsAncestorOf(k2))
CheckTrue(c, !k1.IsAncestorOf(k1)) CheckTrue(c, !k1.IsAncestorOf(k1))
c.Check(k1.Child(NewKey("D")).String(), Equals, k2.String()) c.Check(k1.Child(NewKey("D")).String(), Equals, k2.String())
......
...@@ -39,7 +39,7 @@ var _ ds.Datastore = (*Datastore)(nil) ...@@ -39,7 +39,7 @@ var _ ds.Datastore = (*Datastore)(nil)
func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) { func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
for _, m := range d.mounts { for _, m := range d.mounts {
if m.Prefix.Equal(key) || m.Prefix.IsAncestorOf(key) { if m.Prefix.IsAncestorOf(key) {
s := strings.TrimPrefix(key.String(), m.Prefix.String()) s := strings.TrimPrefix(key.String(), m.Prefix.String())
k := ds.NewKey(s) k := ds.NewKey(s)
return m.Datastore, m.Prefix, k return m.Datastore, m.Prefix, k
...@@ -159,21 +159,21 @@ func (h *querySet) next() (query.Result, bool) { ...@@ -159,21 +159,21 @@ func (h *querySet) next() (query.Result, bool) {
// /aoe/ not matching // /aoe/ not matching
func (d *Datastore) lookupAll(key ds.Key) (dst []ds.Datastore, mountpoint, rest []ds.Key) { func (d *Datastore) lookupAll(key ds.Key) (dst []ds.Datastore, mountpoint, rest []ds.Key) {
for _, m := range d.mounts { for _, m := range d.mounts {
p := m.Prefix.String() if m.Prefix.IsDescendantOf(key) {
if len(p) > 1 {
p = p + "/"
}
if strings.HasPrefix(p, key.String()) {
dst = append(dst, m.Datastore) dst = append(dst, m.Datastore)
mountpoint = append(mountpoint, m.Prefix) mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, ds.NewKey("/")) rest = append(rest, ds.NewKey("/"))
} else if strings.HasPrefix(key.String(), p) { } else if m.Prefix.Equal(key) || m.Prefix.IsAncestorOf(key) {
r := strings.TrimPrefix(key.String(), m.Prefix.String()) r := strings.TrimPrefix(key.String(), m.Prefix.String())
dst = append(dst, m.Datastore) dst = append(dst, m.Datastore)
mountpoint = append(mountpoint, m.Prefix) mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, ds.NewKey(r)) rest = append(rest, ds.NewKey(r))
// We've found an ancestor (or equal) key. We might have
// more general datastores, but they won't contain keys
// with this prefix so there's no point in searching them.
break
} }
} }
return dst, mountpoint, rest return dst, mountpoint, rest
...@@ -191,15 +191,11 @@ func (d *Datastore) Put(key ds.Key, value []byte) error { ...@@ -191,15 +191,11 @@ func (d *Datastore) Put(key ds.Key, value []byte) error {
func (d *Datastore) Sync(prefix ds.Key) error { func (d *Datastore) Sync(prefix ds.Key) error {
// Sync all mount points below the prefix // Sync all mount points below the prefix
// Sync the mount point right at (or above) the prefix // Sync the mount point right at (or above) the prefix
dstores, mountPts, rest := d.lookupAll(prefix) dstores, _, rest := d.lookupAll(prefix)
for i, suffix := range rest { for i, suffix := range rest {
if err := dstores[i].Sync(suffix); err != nil { if err := dstores[i].Sync(suffix); err != nil {
return err return err
} }
if mountPts[i].Equal(prefix) || suffix.String() != "/" {
return nil
}
} }
return nil return nil
......
...@@ -268,49 +268,57 @@ func TestQueryAcrossMounts(t *testing.T) { ...@@ -268,49 +268,57 @@ func TestQueryAcrossMounts(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
res, err := m.Query(query.Query{Prefix: "/ba"}) expect := func(prefix string, values map[string]string) {
if err != nil { t.Helper()
t.Fatalf("Query fail: %v\n", err) res, err := m.Query(query.Query{Prefix: prefix})
}
entries, err := res.Rest()
if err != nil {
err = res.Close()
if err != nil { if err != nil {
t.Errorf("result.Close failed %d", err) t.Fatalf("Query fail: %v\n", err)
} }
t.Fatalf("Query Results.Rest fail: %v\n", err) entries, err := res.Rest()
} if err != nil {
seen := 0 err = res.Close()
if err != nil {
expect := map[string]string{ t.Errorf("result.Close failed %d", err)
"/foo/lorem": "y u here", }
"/bar/ipsum": "234", t.Fatalf("Query Results.Rest fail: %v\n", err)
"/bar/dolor": "345",
"/baz/sit": "456",
"/banana": "567",
}
for _, e := range entries {
v := expect[e.Key]
if v == "" {
t.Errorf("unexpected key %s", e.Key)
} }
if len(entries) != len(values) {
if v != string(e.Value) { t.Errorf("expected %d results, got %d", len(values), len(entries))
t.Errorf("key value didn't match expected %s: '%s' - '%s'", e.Key, v, e.Value)
} }
for _, e := range entries {
v, ok := values[e.Key]
if !ok {
t.Errorf("unexpected key %s", e.Key)
continue
}
expect[e.Key] = "seen" if v != string(e.Value) {
seen++ t.Errorf("key value didn't match expected %s: '%s' - '%s'", e.Key, v, e.Value)
} }
if seen != 4 { values[e.Key] = "seen"
t.Errorf("expected to see 3 values, saw %d", seen) }
} }
err = res.Close() expect("/ba", nil)
if err != nil { expect("/bar", map[string]string{
t.Errorf("result.Close failed %d", err) "/bar/ipsum": "234",
} "/bar/dolor": "345",
})
expect("/baz/", map[string]string{
"/baz/sit": "456",
})
expect("/foo", map[string]string{
"/foo/lorem": "123",
})
expect("/", map[string]string{
"/foo/lorem": "123",
"/bar/ipsum": "234",
"/bar/dolor": "345",
"/baz/sit": "456",
"/banana": "567",
})
expect("/banana", nil)
} }
func TestQueryAcrossMountsWithSort(t *testing.T) { func TestQueryAcrossMountsWithSort(t *testing.T) {
......
...@@ -122,7 +122,6 @@ func (ks *DSSuite) TestQuery(c *C) { ...@@ -122,7 +122,6 @@ func (ks *DSSuite) TestQuery(c *C) {
c.Check(err, Equals, nil) c.Check(err, Equals, nil)
expect = []dsq.Entry{ expect = []dsq.Entry{
{Key: "/bar", Size: len([]byte("/foo/bar")), Value: []byte("/foo/bar")},
{Key: "/bar/baz", Size: len([]byte("/foo/bar/baz")), Value: []byte("/foo/bar/baz")}, {Key: "/bar/baz", Size: len([]byte("/foo/bar/baz")), Value: []byte("/foo/bar/baz")},
} }
......
package query package query
import ( import (
"path"
goprocess "github.com/jbenet/goprocess" goprocess "github.com/jbenet/goprocess"
) )
...@@ -116,7 +118,23 @@ func NaiveOrder(qr Results, orders ...Order) Results { ...@@ -116,7 +118,23 @@ func NaiveOrder(qr Results, orders ...Order) Results {
func NaiveQueryApply(q Query, qr Results) Results { func NaiveQueryApply(q Query, qr Results) Results {
if q.Prefix != "" { if q.Prefix != "" {
qr = NaiveFilter(qr, FilterKeyPrefix{q.Prefix}) // Clean the prefix as a key and append / so a prefix of /bar
// only finds /bar/baz, not /barbaz.
prefix := q.Prefix
if len(prefix) == 0 {
prefix = "/"
} else {
if prefix[0] != '/' {
prefix = "/" + prefix
}
prefix = path.Clean(prefix)
}
// If the prefix isn't "/", end it in a "/" so we only find keys
// _under_ the prefix.
if prefix != "/" {
prefix += "/"
}
qr = NaiveFilter(qr, FilterKeyPrefix{prefix})
} }
for _, f := range q.Filters { for _, f := range q.Filters {
qr = NaiveFilter(qr, f) qr = NaiveFilter(qr, f)
......
...@@ -16,6 +16,8 @@ var sampleKeys = []string{ ...@@ -16,6 +16,8 @@ var sampleKeys = []string{
} }
func testResults(t *testing.T, res Results, expect []string) { func testResults(t *testing.T, res Results, expect []string) {
t.Helper()
actualE, err := res.Rest() actualE, err := res.Rest()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -37,6 +39,7 @@ func testResults(t *testing.T, res Results, expect []string) { ...@@ -37,6 +39,7 @@ func testResults(t *testing.T, res Results, expect []string) {
func TestNaiveQueryApply(t *testing.T) { func TestNaiveQueryApply(t *testing.T) {
testNaiveQueryApply := func(t *testing.T, query Query, keys []string, expect []string) { testNaiveQueryApply := func(t *testing.T, query Query, keys []string, expect []string) {
t.Helper()
e := make([]Entry, len(keys)) e := make([]Entry, len(keys))
for i, k := range keys { for i, k := range keys {
e[i] = Entry{Key: k} e[i] = Entry{Key: k}
...@@ -71,9 +74,6 @@ func TestNaiveQueryApply(t *testing.T) { ...@@ -71,9 +74,6 @@ func TestNaiveQueryApply(t *testing.T) {
testNaiveQueryApply(t, q, sampleKeys, []string{ testNaiveQueryApply(t, q, sampleKeys, []string{
"/ab/c", "/ab/c",
"/ab/cd", "/ab/cd",
"/abce",
"/abcf",
"/ab",
}) })
q = Query{Orders: []Order{OrderByKeyDescending{}}} q = Query{Orders: []Order{OrderByKeyDescending{}}}
...@@ -88,14 +88,12 @@ func TestNaiveQueryApply(t *testing.T) { ...@@ -88,14 +88,12 @@ func TestNaiveQueryApply(t *testing.T) {
q = Query{ q = Query{
Limit: 3, Limit: 3,
Offset: 2, Offset: 1,
Prefix: "/ab", Prefix: "/ab",
Orders: []Order{OrderByKey{}}, Orders: []Order{OrderByKey{}},
} }
testNaiveQueryApply(t, q, sampleKeys, []string{ testNaiveQueryApply(t, q, sampleKeys, []string{
"/ab/cd", "/ab/cd",
"/abce",
"/abcf",
}) })
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment