Commit d1554dd0 authored by Kevin Atkinson's avatar Kevin Atkinson

Store sharding function used in the repo.

Store the sharding function used in the file "SHARDING" in the repo.
To make this work the sharding function is now always specified as a
string.
parent 94a54dfc
......@@ -5,6 +5,7 @@ package flatfs
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path"
......@@ -28,7 +29,8 @@ const (
type Datastore struct {
path string
getDir ShardFunc
shardFunc string
getDir ShardFunc
// sychronize all writes and directory changes for added safety
sync bool
......@@ -38,37 +40,51 @@ var _ datastore.Datastore = (*Datastore)(nil)
type ShardFunc func(string) string
func New(path string, getDir ShardFunc, sync bool) (*Datastore, error) {
fs := &Datastore{
path: path,
getDir: getDir,
sync: sync,
func New(path string, fun0 string, sync bool) (*Datastore, error) {
fun0 = NormalizeShardFunc(fun0)
fun1, err := ReadShardFunc(path)
if err != nil {
return nil, err
}
return fs, nil
}
func Prefix(prefixLen int) ShardFunc {
padding := strings.Repeat("_", prefixLen)
return func(noslash string) string {
return (noslash + padding)[:prefixLen]
fun := ""
switch {
case fun0 == "auto" && fun1 == "auto":
return nil, fmt.Errorf("shard function not specified")
case fun0 == "auto":
fun = fun1
case fun1 == "auto":
fun = fun0
case fun0 != fun1:
return nil, fmt.Errorf("specified shard func '%s' does not match repo shard func '%s'",
fun0, fun1)
default:
fun = fun0
}
getDir, err := ShardFuncFromString(fun)
if err != nil {
return nil, fmt.Errorf("unable to parse shard func: %v", err)
}
}
func Suffix(suffixLen int) ShardFunc {
padding := strings.Repeat("_", suffixLen)
return func(noslash string) string {
str := padding + noslash
return str[len(str)-suffixLen:]
if fun1 == "auto" {
err := WriteShardFunc(path, fun)
if err != nil {
return nil, err
}
}
}
func NextToLast(suffixLen int) ShardFunc {
padding := strings.Repeat("_", suffixLen+1)
return func(noslash string) string {
str := padding + noslash
offset := len(str) - suffixLen - 1
return str[offset : offset+suffixLen]
fs := &Datastore{
path: path,
shardFunc: fun,
getDir: getDir,
sync: sync,
}
return fs, nil
}
func (fs *Datastore) ShardFunc() string {
return fs.shardFunc
}
func (fs *Datastore) encode(key datastore.Key) (dir, file string) {
......
......@@ -2,6 +2,7 @@ package flatfs_test
import (
"encoding/base32"
"fmt"
"io/ioutil"
"math"
"math/rand"
......@@ -32,17 +33,17 @@ func tempdir(t testing.TB) (path string, cleanup func()) {
return path, cleanup
}
func tryAllShardFuncs(t *testing.T, testFunc func(mkShardFunc, *testing.T)) {
t.Run("prefix", func(t *testing.T) { testFunc(flatfs.Prefix, t) })
t.Run("suffix", func(t *testing.T) { testFunc(flatfs.Suffix, t) })
t.Run("next-to-last", func(t *testing.T) { testFunc(flatfs.NextToLast, t) })
func tryAllShardFuncs(t *testing.T, testFunc func(string, *testing.T)) {
t.Run("prefix", func(t *testing.T) { testFunc("prefix", t) })
t.Run("suffix", func(t *testing.T) { testFunc("suffix", t) })
t.Run("next-to-last", func(t *testing.T) { testFunc("next-to-last", t) })
}
func TestPutBadValueType(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, flatfs.Prefix(2), false)
fs, err := flatfs.New(temp, "prefix/2", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -53,13 +54,11 @@ func TestPutBadValueType(t *testing.T) {
}
}
type mkShardFunc func(int) flatfs.ShardFunc
func testPut(dirFunc mkShardFunc, t *testing.T) {
func testPut(dirFunc string, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, dirFunc(2), false)
fs, err := flatfs.New(temp, dirFunc+"/2", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -72,11 +71,11 @@ func testPut(dirFunc mkShardFunc, t *testing.T) {
func TestPut(t *testing.T) { tryAllShardFuncs(t, testPut) }
func testGet(dirFunc mkShardFunc, t *testing.T) {
func testGet(dirFunc string, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, dirFunc(2), false)
fs, err := flatfs.New(temp, dirFunc+"/2", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -102,11 +101,11 @@ func testGet(dirFunc mkShardFunc, t *testing.T) {
func TestGet(t *testing.T) { tryAllShardFuncs(t, testGet) }
func testPutOverwrite(dirFunc mkShardFunc, t *testing.T) {
func testPutOverwrite(dirFunc string, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, dirFunc(2), false)
fs, err := flatfs.New(temp, dirFunc+"/2", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -136,11 +135,11 @@ func testPutOverwrite(dirFunc mkShardFunc, t *testing.T) {
func TestPutOverwrite(t *testing.T) { tryAllShardFuncs(t, testPutOverwrite) }
func testGetNotFoundError(dirFunc mkShardFunc, t *testing.T) {
func testGetNotFoundError(dirFunc string, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, dirFunc(2), false)
fs, err := flatfs.New(temp, dirFunc+"/2", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -157,7 +156,7 @@ type params struct {
what string
dir string
key string
dirFunc mkShardFunc
dirFunc string
}
func testStorage(p *params, t *testing.T) {
......@@ -165,7 +164,7 @@ func testStorage(p *params, t *testing.T) {
defer cleanup()
target := p.dir + string(os.PathSeparator) + p.key + ".data"
fs, err := flatfs.New(temp, p.dirFunc(len(p.dir)), false)
fs, err := flatfs.New(temp, fmt.Sprintf("%s/%d", p.dirFunc, len(p.dir)), false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -185,7 +184,7 @@ func testStorage(p *params, t *testing.T) {
return err
}
switch path {
case ".", "..":
case ".", "..", "SHARDING":
// ignore
case p.dir:
if !fi.IsDir() {
......@@ -222,7 +221,7 @@ func TestStorage(t *testing.T) {
what: "prefix",
dir: "qu",
key: "quux",
dirFunc: flatfs.Prefix,
dirFunc: "prefix",
}, t)
})
t.Run("suffix", func(t *testing.T) {
......@@ -230,7 +229,7 @@ func TestStorage(t *testing.T) {
what: "suffix",
dir: "ux",
key: "quux",
dirFunc: flatfs.Suffix,
dirFunc: "suffix",
}, t)
})
t.Run("next-to-last", func(t *testing.T) {
......@@ -238,16 +237,16 @@ func TestStorage(t *testing.T) {
what: "next-to-last",
dir: "uu",
key: "quux",
dirFunc: flatfs.NextToLast,
dirFunc: "next-to-last",
}, t)
})
}
func testHasNotFound(dirFunc mkShardFunc, t *testing.T) {
func testHasNotFound(dirFunc string, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, dirFunc(2), false)
fs, err := flatfs.New(temp, dirFunc+"/2", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -263,11 +262,11 @@ func testHasNotFound(dirFunc mkShardFunc, t *testing.T) {
func TestHasNotFound(t *testing.T) { tryAllShardFuncs(t, testHasNotFound) }
func testHasFound(dirFunc mkShardFunc, t *testing.T) {
func testHasFound(dirFunc string, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, dirFunc(2), false)
fs, err := flatfs.New(temp, dirFunc+"/2", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -287,11 +286,11 @@ func testHasFound(dirFunc mkShardFunc, t *testing.T) {
func TestHasFound(t *testing.T) { tryAllShardFuncs(t, testHasFound) }
func testDeleteNotFound(dirFunc mkShardFunc, t *testing.T) {
func testDeleteNotFound(dirFunc string, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, dirFunc(2), false)
fs, err := flatfs.New(temp, dirFunc+"/2", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -304,11 +303,11 @@ func testDeleteNotFound(dirFunc mkShardFunc, t *testing.T) {
func TestDeleteNotFound(t *testing.T) { tryAllShardFuncs(t, testDeleteNotFound) }
func testDeleteFound(dirFunc mkShardFunc, t *testing.T) {
func testDeleteFound(dirFunc string, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, dirFunc(2), false)
fs, err := flatfs.New(temp, dirFunc+"/2", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -331,7 +330,7 @@ func testDeleteFound(dirFunc mkShardFunc, t *testing.T) {
func TestDeleteFound(t *testing.T) { tryAllShardFuncs(t, testDeleteFound) }
func testQuerySimple(dirFunc mkShardFunc, t *testing.T) {
func testQuerySimple(dirFunc string, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
......@@ -340,7 +339,7 @@ func testQuerySimple(dirFunc mkShardFunc, t *testing.T) {
t.Fatalf("WriteFile fail: %v\n", err)
}
fs, err := flatfs.New(temp, dirFunc(2), false)
fs, err := flatfs.New(temp, dirFunc+"/2", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -374,11 +373,11 @@ func testQuerySimple(dirFunc mkShardFunc, t *testing.T) {
func TestQuerySimple(t *testing.T) { tryAllShardFuncs(t, testQuerySimple) }
func testBatchPut(dirFunc mkShardFunc, t *testing.T) {
func testBatchPut(dirFunc string, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, dirFunc(2), false)
fs, err := flatfs.New(temp, dirFunc+"/2", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -388,11 +387,11 @@ func testBatchPut(dirFunc mkShardFunc, t *testing.T) {
func TestBatchPut(t *testing.T) { tryAllShardFuncs(t, testBatchPut) }
func testBatchDelete(dirFunc mkShardFunc, t *testing.T) {
func testBatchDelete(dirFunc string, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, dirFunc(2), false)
fs, err := flatfs.New(temp, dirFunc+"/2", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -402,11 +401,44 @@ func testBatchDelete(dirFunc mkShardFunc, t *testing.T) {
func TestBatchDelete(t *testing.T) { tryAllShardFuncs(t, testBatchDelete) }
func TestSHARDINGFile(t *testing.T) {
tempdir, cleanup := tempdir(t)
defer cleanup()
fun := "next-to-last/2"
fs, err := flatfs.New(tempdir, fun, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
fs.Close()
fs, err = flatfs.New(tempdir, "auto", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
if fs.ShardFunc() != flatfs.NormalizeShardFunc(fun) {
t.Fatalf("Shard function in repo not %s", fun)
}
fs.Close()
fs, err = flatfs.New(tempdir, fun, false)
if err != nil {
t.Fatalf("Could not reopen repo: %v\n", err)
}
fs.Close()
fs, err = flatfs.New(tempdir, "prefix/5", false)
if err == nil {
t.Fatalf("Was able to open repo with incompatible sharding function")
}
}
func TestNoCluster(t *testing.T) {
tempdir, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(tempdir, flatfs.NextToLast(1), false)
fs, err := flatfs.New(tempdir, "next-to-last/1", false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
......@@ -428,12 +460,14 @@ func TestNoCluster(t *testing.T) {
if err != nil {
t.Fatalf("ReadDir fail: %v\n", err)
}
if len(dirs) != 32 {
t.Fatalf("Expected 32 directories in %s", tempdir)
}
idealFilesPerDir := float64(N) / 32.0
tolerance := math.Floor(idealFilesPerDir * 0.20)
count := 0
for _, dir := range dirs {
if dir.Name() == "SHARDING" {
continue
}
count += 1
files, err := ioutil.ReadDir(filepath.Join(tempdir, dir.Name()))
if err != nil {
t.Fatalf("ReadDir fail: %v\n", err)
......@@ -444,6 +478,9 @@ func TestNoCluster(t *testing.T) {
filepath.Join(tempdir, dir.Name()), num, idealFilesPerDir-tolerance, idealFilesPerDir+tolerance)
}
}
if count != 32 {
t.Fatalf("Expected 32 directories and one file in %s", tempdir)
}
}
func BenchmarkConsecutivePut(b *testing.B) {
......@@ -461,7 +498,7 @@ func BenchmarkConsecutivePut(b *testing.B) {
temp, cleanup := tempdir(b)
defer cleanup()
fs, err := flatfs.New(temp, flatfs.Prefix(2), false)
fs, err := flatfs.New(temp, "prefix/2", false)
if err != nil {
b.Fatalf("New fail: %v\n", err)
}
......@@ -491,7 +528,7 @@ func BenchmarkBatchedPut(b *testing.B) {
temp, cleanup := tempdir(b)
defer cleanup()
fs, err := flatfs.New(temp, flatfs.Prefix(2), false)
fs, err := flatfs.New(temp, "prefix/2", false)
if err != nil {
b.Fatalf("New fail: %v\n", err)
}
......
package flatfs
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
)
type shardId struct {
version string
funName string
param string
}
func (f shardId) str() string {
if f.funName == "" || f.funName == "auto" {
return "auto"
} else {
return fmt.Sprintf("v1/%s/%s", f.funName, f.param)
}
}
func parseShardFunc(str string) shardId {
str = strings.TrimSpace(str)
parts := strings.Split(str, "/")
// ignore prefix for now
if len(parts) > 3 {
parts = parts[len(parts)-3:]
}
switch len(parts) {
case 3:
return shardId{version: parts[0], funName: parts[1], param: parts[2]}
case 2:
return shardId{funName: parts[0], param: parts[1]}
case 1:
return shardId{funName: parts[0]}
default: // can only happen for len == 0
return shardId{}
}
}
func (f shardId) Func() (ShardFunc, error) {
if f.version != "" && f.version != "v1" {
return nil, fmt.Errorf("expected 'v1' for version string got: %s\n", f.version)
}
if f.param == "" {
return nil, fmt.Errorf("'%s' function requires a parameter", f.funName)
}
len, err := strconv.Atoi(f.param)
if err != nil {
return nil, err
}
switch f.funName {
case "prefix":
return Prefix(len), nil
case "suffix":
return Suffix(len), nil
case "next-to-last":
return NextToLast(len), nil
default:
return nil, fmt.Errorf("expected 'prefix', 'suffix' or 'next-to-last' got: %s", f.funName)
}
}
func NormalizeShardFunc(str string) string {
return parseShardFunc(str).str()
}
func ShardFuncFromString(str string) (ShardFunc, error) {
id := parseShardFunc(str)
fun, err := id.Func()
if err != nil {
return nil, err
}
return fun, nil
}
func ReadShardFunc(dir string) (string, error) {
fun := "auto"
buf, err := ioutil.ReadFile(filepath.Join(dir, "SHARDING"))
str := string(buf)
if err == nil && len(str) != 0 {
fun = NormalizeShardFunc(str)
} else if err != os.ErrNotExist {
fmt.Errorf("unable to read shard function from repo: %v", err)
}
return fun, nil
}
func WriteShardFunc(dir, str string) error {
file, err := os.Create(filepath.Join(dir, "SHARDING"))
if err != nil {
return err
}
defer file.Close()
_, err = file.WriteString(str)
if err != nil {
return err
}
_, err = file.WriteString("\n")
if err != nil {
return err
}
return nil
}
func Prefix(prefixLen int) ShardFunc {
padding := strings.Repeat("_", prefixLen)
return func(noslash string) string {
return (noslash + padding)[:prefixLen]
}
}
func Suffix(suffixLen int) ShardFunc {
padding := strings.Repeat("_", suffixLen)
return func(noslash string) string {
str := padding + noslash
return str[len(str)-suffixLen:]
}
}
func NextToLast(suffixLen int) ShardFunc {
padding := strings.Repeat("_", suffixLen+1)
return func(noslash string) string {
str := padding + noslash
offset := len(str) - suffixLen - 1
return str[offset : offset+suffixLen]
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment