Commit 8996e78c authored by Jeromy's avatar Jeromy

Extract flatfs into its own separate package

parent 6359c75d
// Package flatfs is a Datastore implementation that stores all
// objects in a two-level directory structure in the local file
// system, regardless of the hierarchy of the keys.
package flatfs
import (
"errors"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"time"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/jbenet/go-os-rename"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("flatfs")
const (
extension = ".data"
maxPrefixLen = 16
)
var (
ErrBadPrefixLen = errors.New("bad prefix length")
)
type Datastore struct {
path string
// length of the dir splay prefix
prefixLen int
// sychronize all writes and directory changes for added safety
sync bool
}
var _ datastore.Datastore = (*Datastore)(nil)
func New(path string, prefixLen int, sync bool) (*Datastore, error) {
if prefixLen <= 0 || prefixLen > maxPrefixLen {
return nil, ErrBadPrefixLen
}
fs := &Datastore{
path: path,
prefixLen: prefixLen,
sync: sync,
}
return fs, nil
}
var padding = strings.Repeat("_", maxPrefixLen)
func (fs *Datastore) encode(key datastore.Key) (dir, file string) {
noslash := key.String()[1:]
prefix := (noslash + padding)[:fs.prefixLen]
dir = path.Join(fs.path, prefix)
file = path.Join(dir, noslash+extension)
return dir, file
}
func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) {
if path.Ext(file) != extension {
return datastore.Key{}, false
}
name := file[:len(file)-len(extension)]
return datastore.NewKey(name), true
}
func (fs *Datastore) makePrefixDir(dir string) error {
if err := fs.makePrefixDirNoSync(dir); err != nil {
return err
}
// In theory, if we create a new prefix dir and add a file to
// it, the creation of the prefix dir itself might not be
// durable yet. Sync the root dir after a successful mkdir of
// a prefix dir, just to be paranoid.
if fs.sync {
if err := syncDir(fs.path); err != nil {
return err
}
}
return nil
}
func (fs *Datastore) makePrefixDirNoSync(dir string) error {
if err := os.Mkdir(dir, 0777); err != nil {
// EEXIST is safe to ignore here, that just means the prefix
// directory already existed.
if !os.IsExist(err) {
return err
}
}
return nil
}
var putMaxRetries = 6
func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
val, ok := value.([]byte)
if !ok {
return datastore.ErrInvalidType
}
var err error
for i := 1; i <= putMaxRetries; i++ {
err = fs.doPut(key, val)
if err == nil {
break
}
if !strings.Contains(err.Error(), "too many open files") {
break
}
log.Errorf("too many open files, retrying in %dms", 100*i)
time.Sleep(time.Millisecond * 100 * time.Duration(i))
}
return err
}
func (fs *Datastore) doPut(key datastore.Key, val []byte) error {
dir, path := fs.encode(key)
if err := fs.makePrefixDir(dir); err != nil {
return err
}
tmp, err := ioutil.TempFile(dir, "put-")
if err != nil {
return err
}
closed := false
removed := false
defer func() {
if !closed {
// silence errcheck
_ = tmp.Close()
}
if !removed {
// silence errcheck
_ = os.Remove(tmp.Name())
}
}()
if _, err := tmp.Write(val); err != nil {
return err
}
if fs.sync {
if err := tmp.Sync(); err != nil {
return err
}
}
if err := tmp.Close(); err != nil {
return err
}
closed = true
err = osrename.Rename(tmp.Name(), path)
if err != nil {
return err
}
removed = true
if fs.sync {
if err := syncDir(dir); err != nil {
return err
}
}
return nil
}
func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error {
var dirsToSync []string
files := make(map[*os.File]string)
for key, value := range data {
val, ok := value.([]byte)
if !ok {
return datastore.ErrInvalidType
}
dir, path := fs.encode(key)
if err := fs.makePrefixDirNoSync(dir); err != nil {
return err
}
dirsToSync = append(dirsToSync, dir)
tmp, err := ioutil.TempFile(dir, "put-")
if err != nil {
return err
}
if _, err := tmp.Write(val); err != nil {
return err
}
files[tmp] = path
}
ops := make(map[*os.File]int)
defer func() {
for fi, _ := range files {
val, _ := ops[fi]
switch val {
case 0:
_ = fi.Close()
fallthrough
case 1:
_ = os.Remove(fi.Name())
}
}
}()
// Now we sync everything
// sync and close files
for fi, _ := range files {
if fs.sync {
if err := fi.Sync(); err != nil {
return err
}
}
if err := fi.Close(); err != nil {
return err
}
// signify closed
ops[fi] = 1
}
// move files to their proper places
for fi, path := range files {
if err := osrename.Rename(fi.Name(), path); err != nil {
return err
}
// signify removed
ops[fi] = 2
}
// now sync the dirs for those files
if fs.sync {
for _, dir := range dirsToSync {
if err := syncDir(dir); err != nil {
return err
}
}
// sync top flatfs dir
if err := syncDir(fs.path); err != nil {
return err
}
}
return nil
}
func (fs *Datastore) Get(key datastore.Key) (value interface{}, err error) {
_, path := fs.encode(key)
data, err := ioutil.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return nil, datastore.ErrNotFound
}
// no specific error to return, so just pass it through
return nil, err
}
return data, nil
}
func (fs *Datastore) Has(key datastore.Key) (exists bool, err error) {
_, path := fs.encode(key)
switch _, err := os.Stat(path); {
case err == nil:
return true, nil
case os.IsNotExist(err):
return false, nil
default:
return false, err
}
}
func (fs *Datastore) Delete(key datastore.Key) error {
_, path := fs.encode(key)
switch err := os.Remove(path); {
case err == nil:
return nil
case os.IsNotExist(err):
return datastore.ErrNotFound
default:
return err
}
}
func (fs *Datastore) Query(q query.Query) (query.Results, error) {
if (q.Prefix != "" && q.Prefix != "/") ||
len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 ||
!q.KeysOnly {
// TODO this is overly simplistic, but the only caller is
// `ipfs refs local` for now, and this gets us moving.
return nil, errors.New("flatfs only supports listing all keys in random order")
}
reschan := make(chan query.Result, query.KeysOnlyBufSize)
go func() {
defer close(reschan)
err := filepath.Walk(fs.path, func(path string, info os.FileInfo, err error) error {
if os.IsNotExist(err) {
return nil
}
if err != nil {
log.Errorf("Walk func in Query got error: %v", err)
return err
}
if !info.Mode().IsRegular() || strings.HasPrefix(info.Name(), ".") {
return nil
}
key, ok := fs.decode(info.Name())
if !ok {
log.Warning("failed to decode entry in flatfs")
return nil
}
reschan <- query.Result{
Entry: query.Entry{
Key: key.String(),
},
}
return nil
})
if err != nil {
log.Warning("walk failed: ", err)
}
}()
return query.ResultsWithChan(q, reschan), nil
}
func (fs *Datastore) Close() error {
return nil
}
type flatfsBatch struct {
puts map[datastore.Key]interface{}
deletes map[datastore.Key]struct{}
ds *Datastore
}
func (fs *Datastore) Batch() (datastore.Batch, error) {
return &flatfsBatch{
puts: make(map[datastore.Key]interface{}),
deletes: make(map[datastore.Key]struct{}),
ds: fs,
}, nil
}
func (bt *flatfsBatch) Put(key datastore.Key, val interface{}) error {
bt.puts[key] = val
return nil
}
func (bt *flatfsBatch) Delete(key datastore.Key) error {
bt.deletes[key] = struct{}{}
return nil
}
func (bt *flatfsBatch) Commit() error {
if err := bt.ds.putMany(bt.puts); err != nil {
return err
}
for k, _ := range bt.deletes {
if err := bt.ds.Delete(k); err != nil {
return err
}
}
return nil
}
var _ datastore.ThreadSafeDatastore = (*Datastore)(nil)
func (*Datastore) IsThreadSafe() {}
package flatfs_test
import (
"encoding/base32"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"testing"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/flatfs"
"github.com/ipfs/go-datastore/query"
dstest "github.com/ipfs/go-datastore/test"
rand "github.com/dustin/randbo"
)
func tempdir(t testing.TB) (path string, cleanup func()) {
path, err := ioutil.TempDir("", "test-datastore-flatfs-")
if err != nil {
t.Fatalf("cannot create temp directory: %v", err)
}
cleanup = func() {
if err := os.RemoveAll(path); err != nil {
t.Errorf("tempdir cleanup failed: %v", err)
}
}
return path, cleanup
}
func TestBadPrefixLen(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
for i := 0; i > -3; i-- {
_, err := flatfs.New(temp, i, false)
if g, e := err, flatfs.ErrBadPrefixLen; g != e {
t.Errorf("expected ErrBadPrefixLen, got: %v", g)
}
}
}
func TestPutBadValueType(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
err = fs.Put(datastore.NewKey("quux"), 22)
if g, e := err, datastore.ErrInvalidType; g != e {
t.Fatalf("expected ErrInvalidType, got: %v\n", g)
}
}
func TestPut(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
err = fs.Put(datastore.NewKey("quux"), []byte("foobar"))
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
}
func TestGet(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
const input = "foobar"
err = fs.Put(datastore.NewKey("quux"), []byte(input))
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
data, err := fs.Get(datastore.NewKey("quux"))
if err != nil {
t.Fatalf("Get failed: %v", err)
}
buf, ok := data.([]byte)
if !ok {
t.Fatalf("expected []byte from Get, got %T: %v", data, data)
}
if g, e := string(buf), input; g != e {
t.Fatalf("Get gave wrong content: %q != %q", g, e)
}
}
func TestPutOverwrite(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
const (
loser = "foobar"
winner = "xyzzy"
)
err = fs.Put(datastore.NewKey("quux"), []byte(loser))
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
err = fs.Put(datastore.NewKey("quux"), []byte(winner))
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
data, err := fs.Get(datastore.NewKey("quux"))
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if g, e := string(data.([]byte)), winner; g != e {
t.Fatalf("Get gave wrong content: %q != %q", g, e)
}
}
func TestGetNotFoundError(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
_, err = fs.Get(datastore.NewKey("quux"))
if g, e := err, datastore.ErrNotFound; g != e {
t.Fatalf("expected ErrNotFound, got: %v\n", g)
}
}
func TestStorage(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
const prefixLen = 2
const prefix = "qu"
const target = prefix + string(os.PathSeparator) + "quux.data"
fs, err := flatfs.New(temp, prefixLen, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
err = fs.Put(datastore.NewKey("quux"), []byte("foobar"))
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
seen := false
walk := func(absPath string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
path, err := filepath.Rel(temp, absPath)
if err != nil {
return err
}
switch path {
case ".", "..":
// ignore
case prefix:
if !fi.IsDir() {
t.Errorf("prefix directory is not a file? %v", fi.Mode())
}
// we know it's there if we see the file, nothing more to
// do here
case target:
seen = true
if !fi.Mode().IsRegular() {
t.Errorf("expected a regular file, mode: %04o", fi.Mode())
}
if runtime.GOOS != "windows" {
if g, e := fi.Mode()&os.ModePerm&0007, os.FileMode(0000); g != e {
t.Errorf("file should not be world accessible: %04o", fi.Mode())
}
}
default:
t.Errorf("saw unexpected directory entry: %q %v", path, fi.Mode())
}
return nil
}
if err := filepath.Walk(temp, walk); err != nil {
t.Fatal("walk: %v", err)
}
if !seen {
t.Error("did not see the data file")
}
}
func TestHasNotFound(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
found, err := fs.Has(datastore.NewKey("quux"))
if err != nil {
t.Fatalf("Has fail: %v\n", err)
}
if g, e := found, false; g != e {
t.Fatalf("wrong Has: %v != %v", g, e)
}
}
func TestHasFound(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
err = fs.Put(datastore.NewKey("quux"), []byte("foobar"))
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
found, err := fs.Has(datastore.NewKey("quux"))
if err != nil {
t.Fatalf("Has fail: %v\n", err)
}
if g, e := found, true; g != e {
t.Fatalf("wrong Has: %v != %v", g, e)
}
}
func TestDeleteNotFound(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
err = fs.Delete(datastore.NewKey("quux"))
if g, e := err, datastore.ErrNotFound; g != e {
t.Fatalf("expected ErrNotFound, got: %v\n", g)
}
}
func TestDeleteFound(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
err = fs.Put(datastore.NewKey("quux"), []byte("foobar"))
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
err = fs.Delete(datastore.NewKey("quux"))
if err != nil {
t.Fatalf("Delete fail: %v\n", err)
}
// check that it's gone
_, err = fs.Get(datastore.NewKey("quux"))
if g, e := err, datastore.ErrNotFound; g != e {
t.Fatalf("expected Get after Delete to give ErrNotFound, got: %v\n", g)
}
}
func TestQuerySimple(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
const myKey = "quux"
err = fs.Put(datastore.NewKey(myKey), []byte("foobar"))
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
res, err := fs.Query(query.Query{KeysOnly: true})
if err != nil {
t.Fatalf("Query fail: %v\n", err)
}
entries, err := res.Rest()
if err != nil {
t.Fatalf("Query Results.Rest fail: %v\n", err)
}
seen := false
for _, e := range entries {
switch e.Key {
case datastore.NewKey(myKey).String():
seen = true
default:
t.Errorf("saw unexpected key: %q", e.Key)
}
}
if !seen {
t.Errorf("did not see wanted key %q in %+v", myKey, entries)
}
}
func TestBatchPut(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
dstest.RunBatchTest(t, fs)
}
func TestBatchDelete(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
dstest.RunBatchDeleteTest(t, fs)
}
func BenchmarkConsecutivePut(b *testing.B) {
r := rand.New()
var blocks [][]byte
var keys []datastore.Key
for i := 0; i < b.N; i++ {
blk := make([]byte, 256*1024)
r.Read(blk)
blocks = append(blocks, blk)
key := base32.StdEncoding.EncodeToString(blk[:8])
keys = append(keys, datastore.NewKey(key))
}
temp, cleanup := tempdir(b)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
b.Fatalf("New fail: %v\n", err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := fs.Put(keys[i], blocks[i])
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkBatchedPut(b *testing.B) {
r := rand.New()
var blocks [][]byte
var keys []datastore.Key
for i := 0; i < b.N; i++ {
blk := make([]byte, 256*1024)
r.Read(blk)
blocks = append(blocks, blk)
key := base32.StdEncoding.EncodeToString(blk[:8])
keys = append(keys, datastore.NewKey(key))
}
temp, cleanup := tempdir(b)
defer cleanup()
fs, err := flatfs.New(temp, 2, false)
if err != nil {
b.Fatalf("New fail: %v\n", err)
}
b.ResetTimer()
for i := 0; i < b.N; {
batch, err := fs.Batch()
if err != nil {
b.Fatal(err)
}
for n := i; i-n < 512 && i < b.N; i++ {
err := batch.Put(keys[i], blocks[i])
if err != nil {
b.Fatal(err)
}
}
err = batch.Commit()
if err != nil {
b.Fatal(err)
}
}
}
// +build !windows
package flatfs
import "os"
func syncDir(dir string) error {
dirF, err := os.Open(dir)
if err != nil {
return err
}
defer dirF.Close()
if err := dirF.Sync(); err != nil {
return err
}
return nil
}
package flatfs
func syncDir(dir string) error {
return nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment