Commit 0969a796 authored by Kevin Atkinson's avatar Kevin Atkinson

Support using suffix of key for directory in addition to the prefix.

parent be314e51
...@@ -22,18 +22,13 @@ import ( ...@@ -22,18 +22,13 @@ import (
var log = logging.Logger("flatfs") var log = logging.Logger("flatfs")
const ( const (
extension = ".data" extension = ".data"
maxPrefixLen = 16
)
var (
ErrBadPrefixLen = errors.New("bad prefix length")
) )
type Datastore struct { type Datastore struct {
path string path string
// length of the dir splay prefix
prefixLen int getDir ShardFunc
// sychronize all writes and directory changes for added safety // sychronize all writes and directory changes for added safety
sync bool sync bool
...@@ -41,24 +36,35 @@ type Datastore struct { ...@@ -41,24 +36,35 @@ type Datastore struct {
var _ datastore.Datastore = (*Datastore)(nil) var _ datastore.Datastore = (*Datastore)(nil)
func New(path string, prefixLen int, sync bool) (*Datastore, error) { type ShardFunc func(string) string
if prefixLen <= 0 || prefixLen > maxPrefixLen {
return nil, ErrBadPrefixLen func New(path string, getDir ShardFunc, sync bool) (*Datastore, error) {
}
fs := &Datastore{ fs := &Datastore{
path: path, path: path,
prefixLen: prefixLen, getDir: getDir,
sync: sync, sync: sync,
} }
return fs, nil return fs, nil
} }
var padding = strings.Repeat("_", maxPrefixLen) func Prefix(prefixLen int) ShardFunc {
padding := strings.Repeat("_", prefixLen)
return func(noslash string) string {
return (noslash + padding)[:prefixLen]
}
}
func Suffix(suffixLen int) ShardFunc {
padding := strings.Repeat("_", suffixLen)
return func(noslash string) string {
str := padding + noslash
return str[len(str)-suffixLen:]
}
}
func (fs *Datastore) encode(key datastore.Key) (dir, file string) { func (fs *Datastore) encode(key datastore.Key) (dir, file string) {
noslash := key.String()[1:] noslash := key.String()[1:]
prefix := (noslash + padding)[:fs.prefixLen] dir = path.Join(fs.path, fs.getDir(noslash))
dir = path.Join(fs.path, prefix)
file = path.Join(dir, noslash+extension) file = path.Join(dir, noslash+extension)
return dir, file return dir, file
} }
...@@ -71,8 +77,8 @@ func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) { ...@@ -71,8 +77,8 @@ func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) {
return datastore.NewKey(name), true return datastore.NewKey(name), true
} }
func (fs *Datastore) makePrefixDir(dir string) error { func (fs *Datastore) makeDir(dir string) error {
if err := fs.makePrefixDirNoSync(dir); err != nil { if err := fs.makeDirNoSync(dir); err != nil {
return err return err
} }
...@@ -88,7 +94,7 @@ func (fs *Datastore) makePrefixDir(dir string) error { ...@@ -88,7 +94,7 @@ func (fs *Datastore) makePrefixDir(dir string) error {
return nil return nil
} }
func (fs *Datastore) makePrefixDirNoSync(dir string) error { func (fs *Datastore) makeDirNoSync(dir string) error {
if err := os.Mkdir(dir, 0777); err != nil { if err := os.Mkdir(dir, 0777); err != nil {
// EEXIST is safe to ignore here, that just means the prefix // EEXIST is safe to ignore here, that just means the prefix
// directory already existed. // directory already existed.
...@@ -126,7 +132,7 @@ func (fs *Datastore) Put(key datastore.Key, value interface{}) error { ...@@ -126,7 +132,7 @@ func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
func (fs *Datastore) doPut(key datastore.Key, val []byte) error { func (fs *Datastore) doPut(key datastore.Key, val []byte) error {
dir, path := fs.encode(key) dir, path := fs.encode(key)
if err := fs.makePrefixDir(dir); err != nil { if err := fs.makeDir(dir); err != nil {
return err return err
} }
...@@ -184,7 +190,7 @@ func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error { ...@@ -184,7 +190,7 @@ func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error {
return datastore.ErrInvalidType return datastore.ErrInvalidType
} }
dir, path := fs.encode(key) dir, path := fs.encode(key)
if err := fs.makePrefixDirNoSync(dir); err != nil { if err := fs.makeDirNoSync(dir); err != nil {
return err return err
} }
dirsToSync = append(dirsToSync, dir) dirsToSync = append(dirsToSync, dir)
......
...@@ -30,23 +30,11 @@ func tempdir(t testing.TB) (path string, cleanup func()) { ...@@ -30,23 +30,11 @@ func tempdir(t testing.TB) (path string, cleanup func()) {
return path, cleanup return path, cleanup
} }
func TestBadPrefixLen(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
for i := 0; i > -3; i-- {
_, err := flatfs.New(temp, i, false)
if g, e := err, flatfs.ErrBadPrefixLen; g != e {
t.Errorf("expected ErrBadPrefixLen, got: %v", g)
}
}
}
func TestPutBadValueType(t *testing.T) { func TestPutBadValueType(t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, flatfs.Prefix(2), false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
...@@ -57,11 +45,13 @@ func TestPutBadValueType(t *testing.T) { ...@@ -57,11 +45,13 @@ func TestPutBadValueType(t *testing.T) {
} }
} }
func TestPut(t *testing.T) { type mkShardFunc func(int) flatfs.ShardFunc
func testPut(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, dirFunc(2), false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
...@@ -72,11 +62,16 @@ func TestPut(t *testing.T) { ...@@ -72,11 +62,16 @@ func TestPut(t *testing.T) {
} }
} }
func TestGet(t *testing.T) { func TestPut(t *testing.T) {
t.Run("prefix", func(t *testing.T) { testPut(flatfs.Prefix, t) })
t.Run("suffix", func(t *testing.T) { testPut(flatfs.Prefix, t) })
}
func testGet(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, dirFunc(2), false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
...@@ -100,11 +95,16 @@ func TestGet(t *testing.T) { ...@@ -100,11 +95,16 @@ func TestGet(t *testing.T) {
} }
} }
func TestPutOverwrite(t *testing.T) { func TestGet(t *testing.T) {
t.Run("prefix", func(t *testing.T) { testGet(flatfs.Prefix, t) })
t.Run("suffix", func(t *testing.T) { testGet(flatfs.Prefix, t) })
}
func testPutOverwrite(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, dirFunc(2), false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
...@@ -132,11 +132,16 @@ func TestPutOverwrite(t *testing.T) { ...@@ -132,11 +132,16 @@ func TestPutOverwrite(t *testing.T) {
} }
} }
func TestGetNotFoundError(t *testing.T) { func TestPutOverwrite(t *testing.T) {
t.Run("prefix", func(t *testing.T) { testPutOverwrite(flatfs.Prefix, t) })
t.Run("suffix", func(t *testing.T) { testPutOverwrite(flatfs.Prefix, t) })
}
func testGetNotFoundError(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, dirFunc(2), false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
...@@ -147,19 +152,29 @@ func TestGetNotFoundError(t *testing.T) { ...@@ -147,19 +152,29 @@ func TestGetNotFoundError(t *testing.T) {
} }
} }
func TestStorage(t *testing.T) { func TestGetNotFoundError(t *testing.T) {
t.Run("prefix", func(t *testing.T) { testGetNotFoundError(flatfs.Prefix, t) })
t.Run("suffix", func(t *testing.T) { testGetNotFoundError(flatfs.Prefix, t) })
}
type params struct {
what string
dir string
key string
dirFunc mkShardFunc
}
func testStorage(p *params, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
const prefixLen = 2 target := p.dir + string(os.PathSeparator) + p.key + ".data"
const prefix = "qu" fs, err := flatfs.New(temp, p.dirFunc(len(p.dir)), false)
const target = prefix + string(os.PathSeparator) + "quux.data"
fs, err := flatfs.New(temp, prefixLen, false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
err = fs.Put(datastore.NewKey("quux"), []byte("foobar")) err = fs.Put(datastore.NewKey(p.key), []byte("foobar"))
if err != nil { if err != nil {
t.Fatalf("Put fail: %v\n", err) t.Fatalf("Put fail: %v\n", err)
} }
...@@ -176,9 +191,9 @@ func TestStorage(t *testing.T) { ...@@ -176,9 +191,9 @@ func TestStorage(t *testing.T) {
switch path { switch path {
case ".", "..": case ".", "..":
// ignore // ignore
case prefix: case p.dir:
if !fi.IsDir() { if !fi.IsDir() {
t.Errorf("prefix directory is not a file? %v", fi.Mode()) t.Errorf("%s directory is not a file? %v", p.what, fi.Mode())
} }
// we know it's there if we see the file, nothing more to // we know it's there if we see the file, nothing more to
// do here // do here
...@@ -205,11 +220,30 @@ func TestStorage(t *testing.T) { ...@@ -205,11 +220,30 @@ func TestStorage(t *testing.T) {
} }
} }
func TestHasNotFound(t *testing.T) { func TestStorage(t *testing.T) {
t.Run("prefix", func(t *testing.T) {
testStorage(&params{
what: "prefix",
dir: "qu",
key: "quux",
dirFunc: flatfs.Prefix,
}, t)
})
t.Run("suffix", func(t *testing.T) {
testStorage(&params{
what: "suffix",
dir: "ux",
key: "quux",
dirFunc: flatfs.Suffix,
}, t)
})
}
func testHasNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, dirFunc(2), false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
...@@ -223,11 +257,16 @@ func TestHasNotFound(t *testing.T) { ...@@ -223,11 +257,16 @@ func TestHasNotFound(t *testing.T) {
} }
} }
func TestHasFound(t *testing.T) { func TestHasNotFound(t *testing.T) {
t.Run("prefix", func(t *testing.T) { testHasNotFound(flatfs.Prefix, t) })
t.Run("suffix", func(t *testing.T) { testHasNotFound(flatfs.Prefix, t) })
}
func testHasFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, dirFunc(2), false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
...@@ -245,11 +284,16 @@ func TestHasFound(t *testing.T) { ...@@ -245,11 +284,16 @@ func TestHasFound(t *testing.T) {
} }
} }
func TestDeleteNotFound(t *testing.T) { func TestHasFound(t *testing.T) {
t.Run("prefix", func(t *testing.T) { testHasFound(flatfs.Prefix, t) })
t.Run("suffix", func(t *testing.T) { testHasFound(flatfs.Prefix, t) })
}
func testDeleteNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, dirFunc(2), false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
...@@ -260,11 +304,16 @@ func TestDeleteNotFound(t *testing.T) { ...@@ -260,11 +304,16 @@ func TestDeleteNotFound(t *testing.T) {
} }
} }
func TestDeleteFound(t *testing.T) { func TestDeleteNotFound(t *testing.T) {
t.Run("prefix", func(t *testing.T) { testDeleteNotFound(flatfs.Prefix, t) })
t.Run("suffix", func(t *testing.T) { testDeleteNotFound(flatfs.Prefix, t) })
}
func testDeleteFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, dirFunc(2), false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
...@@ -285,11 +334,16 @@ func TestDeleteFound(t *testing.T) { ...@@ -285,11 +334,16 @@ func TestDeleteFound(t *testing.T) {
} }
} }
func TestQuerySimple(t *testing.T) { func TestDeleteFound(t *testing.T) {
t.Run("prefix", func(t *testing.T) { testDeleteFound(flatfs.Prefix, t) })
t.Run("suffix", func(t *testing.T) { testDeleteFound(flatfs.Prefix, t) })
}
func testQuerySimple(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, dirFunc(2), false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
...@@ -321,11 +375,16 @@ func TestQuerySimple(t *testing.T) { ...@@ -321,11 +375,16 @@ func TestQuerySimple(t *testing.T) {
} }
} }
func TestBatchPut(t *testing.T) { func TestQuerySimple(t *testing.T) {
t.Run("prefix", func(t *testing.T) { testQuerySimple(flatfs.Prefix, t) })
t.Run("suffix", func(t *testing.T) { testQuerySimple(flatfs.Prefix, t) })
}
func testBatchPut(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, dirFunc(2), false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
...@@ -333,11 +392,16 @@ func TestBatchPut(t *testing.T) { ...@@ -333,11 +392,16 @@ func TestBatchPut(t *testing.T) {
dstest.RunBatchTest(t, fs) dstest.RunBatchTest(t, fs)
} }
func TestBatchDelete(t *testing.T) { func TestBatchPut(t *testing.T) {
t.Run("prefix", func(t *testing.T) { testBatchPut(flatfs.Prefix, t) })
t.Run("suffix", func(t *testing.T) { testBatchPut(flatfs.Prefix, t) })
}
func testBatchDelete(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, dirFunc(2), false)
if err != nil { if err != nil {
t.Fatalf("New fail: %v\n", err) t.Fatalf("New fail: %v\n", err)
} }
...@@ -345,6 +409,11 @@ func TestBatchDelete(t *testing.T) { ...@@ -345,6 +409,11 @@ func TestBatchDelete(t *testing.T) {
dstest.RunBatchDeleteTest(t, fs) dstest.RunBatchDeleteTest(t, fs)
} }
func TestBatchDelete(t *testing.T) {
t.Run("prefix", func(t *testing.T) { testBatchDelete(flatfs.Prefix, t) })
t.Run("suffix", func(t *testing.T) { testBatchDelete(flatfs.Prefix, t) })
}
func BenchmarkConsecutivePut(b *testing.B) { func BenchmarkConsecutivePut(b *testing.B) {
r := rand.New() r := rand.New()
var blocks [][]byte var blocks [][]byte
...@@ -360,7 +429,7 @@ func BenchmarkConsecutivePut(b *testing.B) { ...@@ -360,7 +429,7 @@ func BenchmarkConsecutivePut(b *testing.B) {
temp, cleanup := tempdir(b) temp, cleanup := tempdir(b)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, flatfs.Prefix(2), false)
if err != nil { if err != nil {
b.Fatalf("New fail: %v\n", err) b.Fatalf("New fail: %v\n", err)
} }
...@@ -390,7 +459,7 @@ func BenchmarkBatchedPut(b *testing.B) { ...@@ -390,7 +459,7 @@ func BenchmarkBatchedPut(b *testing.B) {
temp, cleanup := tempdir(b) temp, cleanup := tempdir(b)
defer cleanup() defer cleanup()
fs, err := flatfs.New(temp, 2, false) fs, err := flatfs.New(temp, flatfs.Prefix(2), false)
if err != nil { if err != nil {
b.Fatalf("New fail: %v\n", err) b.Fatalf("New fail: %v\n", err)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment