Commit 653c9af9 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #9 from ipfs/kevina/convert

Provide utility to convert from one Sharding type to another.
parents 9553971a 51a1252c
......@@ -12,7 +12,7 @@ install: true
script:
- make deps
- gx-go rewrite
- go test -race -coverprofile=unittest.coverprofile -covermode=atomic ./...
- go test -race -coverprofile=unittest.coverprofile -covermode=atomic .
after_success:
......
// Package flatfs is a Datastore implementation that stores all
// objects in a two-level directory structure in the local file
// system, regardless of the hierarchy of the keys.
package flatfs
import (
"fmt"
"io"
"os"
"path/filepath"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/jbenet/go-os-rename"
)
func UpgradeV0toV1(path string, prefixLen int) error {
fun := Prefix(prefixLen)
err := WriteShardFunc(path, fun)
if err != nil {
return err
}
err = WriteReadme(path, fun)
if err != nil {
return err
}
return nil
}
func DowngradeV1toV0(path string) error {
fun, err := ReadShardFunc(path)
if err != nil {
return err
} else if fun.funName != "prefix" {
return fmt.Errorf("%s: can only downgrade datastore that use the 'prefix' sharding function", path)
}
err = os.Remove(filepath.Join(path, SHARDING_FN))
if err != nil {
return err
}
err = os.Remove(filepath.Join(path, README_FN))
if err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
func Move(oldPath string, newPath string, out io.Writer) error {
oldDS, err := Open(oldPath, false)
if err != nil {
return fmt.Errorf("%s: %v", oldPath, err)
}
newDS, err := Open(newPath, false)
if err != nil {
return fmt.Errorf("%s: %v", newPath, err)
}
res, err := oldDS.Query(query.Query{KeysOnly: true})
if err != nil {
return err
}
if out != nil {
fmt.Fprintf(out, "Moving Keys...\n")
}
// first move the keys
count := 0
for {
e, ok := res.NextSync()
if !ok {
break
}
if e.Error != nil {
return e.Error
}
err := moveKey(oldDS, newDS, datastore.RawKey(e.Key))
if err != nil {
return err
}
count++
if out != nil && count%10 == 0 {
fmt.Fprintf(out, "\r%d keys so far", count)
}
}
if out != nil {
fmt.Fprintf(out, "\nCleaning Up...\n")
}
// now walk the old top-level directory
dir, err := os.Open(oldDS.path)
if err != nil {
return err
}
defer dir.Close()
names, err := dir.Readdirnames(-1)
if err != nil {
return err
}
for _, fn := range names {
if fn == "." || fn == ".." {
continue
}
oldPath := filepath.Join(oldDS.path, fn)
inf, err := os.Stat(oldPath)
if err != nil {
return err
}
if inf.IsDir() || fn == SHARDING_FN || fn == README_FN {
// we are an empty directory or generated file so just remove it
err := os.Remove(oldPath)
if err != nil {
return err
}
} else {
// else we found something unexpected, so to be safe just move it
log.Warningf("found unexpected file in datastore directory: \"%s\", moving anyway", fn)
newPath := filepath.Join(newDS.path, fn)
err := osrename.Rename(oldPath, newPath)
if err != nil {
return err
}
}
}
if out != nil {
fmt.Fprintf(out, "All Done.\n")
}
return nil
}
func moveKey(oldDS *Datastore, newDS *Datastore, key datastore.Key) error {
_, oldPath := oldDS.encode(key)
dir, newPath := newDS.encode(key)
err := newDS.makeDirNoSync(dir)
if err != nil {
return err
}
err = osrename.Rename(oldPath, newPath)
if err != nil {
return err
}
return nil
}
package flatfs_test
import (
"bytes"
"encoding/hex"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"testing"
"time"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-ds-flatfs"
)
func TestMove(t *testing.T) {
tempdir, cleanup := tempdir(t)
defer cleanup()
v1dir := filepath.Join(tempdir, "v1")
createDatastore(t, v1dir, flatfs.Prefix(3))
err := ioutil.WriteFile(filepath.Join(v1dir, "README_ALSO"), []byte("something"), 0666)
if err != nil {
t.Fatalf("WriteFile fail: %v\n", err)
}
keys, blocks := populateDatastore(t, v1dir)
v2dir := filepath.Join(tempdir, "v2")
createDatastore(t, v2dir, flatfs.NextToLast(2))
err = flatfs.Move(v1dir, v2dir, nil)
if err != nil {
t.Fatalf("%v\n", err)
}
// make sure the directory empty
rmEmptyDatastore(t, v1dir)
// make sure the README file moved
_, err = os.Stat(filepath.Join(v2dir, "README_ALSO"))
if err != nil {
t.Fatalf(err.Error())
}
// check that all keys are available
checkKeys(t, v2dir, keys, blocks)
// check that a key is in the correct format
shard := filepath.Join(v2dir, flatfs.NextToLast(2).Func()(keys[0].String()))
_, err = os.Stat(shard)
if err != nil {
t.Fatalf(err.Error())
}
}
func TestMoveRestart(t *testing.T) {
tempdir, cleanup := tempdir(t)
defer cleanup()
v1dir := filepath.Join(tempdir, "v1")
v2dir := filepath.Join(tempdir, "v2")
createDatastore(t, v1dir, flatfs.Prefix(3))
createDatastore(t, v2dir, flatfs.NextToLast(5))
keys, blocks := populateDatastore(t, v1dir)
checkKeys(t, v1dir, keys, blocks)
// get a directory in the datastore
noslash := keys[0].String()[1:]
aDir := filepath.Join(tempdir, "v1", flatfs.Prefix(3).Func()(noslash))
// create a permission problem on the directory
err := os.Chmod(aDir, 0500)
if err != nil {
t.Fatalf("%v\n", err)
}
// try the move it should fail partly through
err = flatfs.Move(v1dir, v2dir, nil)
if err == nil {
t.Fatalf("Move should have failed.", err)
}
// okay try to undo should be okay
err = flatfs.Move(v2dir, v1dir, nil)
if err != nil {
t.Fatalf("Could not undo the move.", err)
}
checkKeys(t, v1dir, keys, blocks)
// there should be nothing left in the new datastore
rmEmptyDatastore(t, v2dir)
// try the move again, again should fail
createDatastore(t, v2dir, flatfs.NextToLast(2))
err = flatfs.Move(v1dir, v2dir, nil)
if err == nil {
t.Fatalf("Move should have failed.", err)
}
// fix the permission problem
err = os.Chmod(aDir, 0700)
if err != nil {
t.Fatalf("%v\n", err)
}
// restart the move, it should be okay now
err = flatfs.Move(v1dir, v2dir, nil)
if err != nil {
t.Fatalf("Move not okay: %v\n", err)
}
// make sure everything moved by removing the old directory
rmEmptyDatastore(t, v1dir)
// make sure everything moved by checking all keys
checkKeys(t, v2dir, keys, blocks)
// check that a key is in the correct format
shard := filepath.Join(v2dir, flatfs.NextToLast(2).Func()(keys[0].String()))
_, err = os.Stat(shard)
if err != nil {
t.Fatalf(err.Error())
}
}
func TestUpgradeDownload(t *testing.T) {
tempdir, cleanup := tempdir(t)
defer cleanup()
createDatastore(t, tempdir, flatfs.Prefix(3))
keys, blocks := populateDatastore(t, tempdir)
checkKeys(t, tempdir, keys, blocks)
err := flatfs.UpgradeV0toV1(tempdir, 3)
if err == nil {
t.Fatalf("UpgradeV0toV1 on already v1 should fail.")
}
err = flatfs.DowngradeV1toV0(tempdir)
if err != nil {
t.Fatalf("DowngradeV1toV0 fail: %v\n", err)
}
_, err = os.Stat(filepath.Join(tempdir, flatfs.SHARDING_FN))
if err == nil {
t.Fatalf("%v not in v0 format, SHARDING FILE exists")
} else if !os.IsNotExist(err) {
t.Fatalf("Stat fail: %v\n", err)
}
err = flatfs.UpgradeV0toV1(tempdir, 3)
if err != nil {
t.Fatalf("UpgradeV0toV1 fail %v\n", err)
}
// This will fail unless the repository is in the new version
checkKeys(t, tempdir, keys, blocks)
}
func TestDownloadNonPrefix(t *testing.T) {
tempdir, cleanup := tempdir(t)
defer cleanup()
createDatastore(t, tempdir, flatfs.NextToLast(2))
err := flatfs.DowngradeV1toV0(tempdir)
if err == nil {
t.Fatalf("DowngradeV1toV0 should have failed", err)
}
}
func createDatastore(t *testing.T, dir string, fun *flatfs.ShardIdV1) {
err := flatfs.Create(dir, fun)
if err != nil {
t.Fatalf("Create fail: %s: %v\n", dir, err)
}
}
func rmEmptyDatastore(t *testing.T, dir string) {
err := os.Remove(dir)
if err != nil {
t.Fatalf("Remove fail: %v\n", err)
}
}
func populateDatastore(t *testing.T, dir string) ([]datastore.Key, [][]byte) {
ds, err := flatfs.Open(dir, false)
if err != nil {
t.Fatalf("Open fail: %v\n", err)
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
var blocks [][]byte
var keys []datastore.Key
for i := 0; i < 256; i++ {
blk := make([]byte, 1000)
r.Read(blk)
blocks = append(blocks, blk)
key := "x" + hex.EncodeToString(blk[:8])
keys = append(keys, datastore.NewKey(key))
err := ds.Put(keys[i], blocks[i])
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
}
return keys, blocks
}
func checkKeys(t *testing.T, dir string, keys []datastore.Key, blocks [][]byte) {
ds, err := flatfs.Open(dir, false)
if err != nil {
t.Fatalf("Open fail: %v\n", err)
}
for i, key := range keys {
data, err := ds.Get(key)
if err != nil {
t.Fatalf("Get fail: %v\n", err)
}
if !bytes.Equal(data.([]byte), blocks[i]) {
t.Fatalf("block context differ for key %s\n", key.String())
}
}
}
package main
import (
"fmt"
"os"
"strconv"
"github.com/ipfs/go-ds-flatfs"
)
// To convert from the old format to a new format with a different
// sharding function use:
// flatfs upgrade blocks 5
// flatfs create blocks-new v1/next-to-last/2
// flatfs move blocks blocks-new
// rmdir blocks
// mv blocks-new blocks
// to do the reverse
// flatfs create blocks-new v1/prefix/5
// flatfs move blocks blocks-new
// rmdir blocks
// mv blocks-new blocks
// flatfs downgrade blocks
func usage() {
fmt.Fprintf(os.Stderr, "usage: %s create DIR SHARDFUN | upgrade DIR PREFIXLEN | downgrade DIR | move OLDDIR NEWDIR\n", os.Args[0])
os.Exit(1)
}
func fail(err error) {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
func main() {
if len(os.Args) < 2 {
usage()
}
switch os.Args[1] {
case "create":
if len(os.Args) != 4 {
usage()
}
dir := os.Args[2]
funStr := os.Args[3]
if funStr[0] != '/' {
if funStr[0] != 'v' { // and version if not provided
funStr = "v1/" + funStr
}
funStr = flatfs.PREFIX + funStr
}
fun, err := flatfs.ParseShardFunc(funStr)
if err != nil {
fail(err)
}
err = flatfs.Create(dir, fun)
if err != nil {
fail(err)
}
case "upgrade":
if len(os.Args) != 4 {
usage()
}
dir := os.Args[2]
prefixLen, err := strconv.Atoi(os.Args[3])
if err != nil {
fail(err)
}
err = flatfs.UpgradeV0toV1(dir, prefixLen)
if err != nil {
fail(err)
}
case "downgrade":
if len(os.Args) != 3 {
usage()
}
dir := os.Args[2]
err := flatfs.DowngradeV1toV0(dir)
if err != nil {
fail(err)
}
case "move":
if len(os.Args) != 4 {
usage()
}
oldDir := os.Args[2]
newDir := os.Args[3]
err := flatfs.Move(oldDir, newDir, os.Stderr)
if err != nil {
fail(err)
}
default:
usage()
}
}
......@@ -9,13 +9,12 @@ import (
"path/filepath"
"runtime"
"testing"
"time"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dstest "github.com/ipfs/go-datastore/test"
"github.com/ipfs/go-ds-flatfs"
randbo "github.com/dustin/randbo"
)
func tempdir(t testing.TB) (path string, cleanup func()) {
......@@ -185,9 +184,9 @@ func testStorage(p *params, t *testing.T) {
return err
}
switch path {
case ".", "..", flatfs.SHARDING_FN:
case ".", "..", "SHARDING":
// ignore
case flatfs.README_FN:
case "_README":
_, err := ioutil.ReadFile(absPath)
if err != nil {
t.Error("could not read _README file")
......@@ -465,7 +464,7 @@ func TestNoCluster(t *testing.T) {
t.Fatalf("New fail: %v\n", err)
}
r := randbo.NewFrom(rand.NewSource(0))
r := rand.New(rand.NewSource(time.Now().UnixNano()))
N := 3200 // should be divisible by 32 so the math works out
for i := 0; i < N; i++ {
blk := make([]byte, 1000)
......@@ -483,7 +482,7 @@ func TestNoCluster(t *testing.T) {
t.Fatalf("ReadDir fail: %v\n", err)
}
idealFilesPerDir := float64(N) / 32.0
tolerance := math.Floor(idealFilesPerDir * 0.20)
tolerance := math.Floor(idealFilesPerDir * 0.50)
count := 0
for _, dir := range dirs {
if dir.Name() == flatfs.SHARDING_FN || dir.Name() == flatfs.README_FN {
......@@ -506,7 +505,7 @@ func TestNoCluster(t *testing.T) {
}
func BenchmarkConsecutivePut(b *testing.B) {
r := randbo.New()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
var blocks [][]byte
var keys []datastore.Key
for i := 0; i < b.N; i++ {
......@@ -536,7 +535,7 @@ func BenchmarkConsecutivePut(b *testing.B) {
}
func BenchmarkBatchedPut(b *testing.B) {
r := randbo.New()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
var blocks [][]byte
var keys []datastore.Key
for i := 0; i < b.N; i++ {
......
......@@ -15,7 +15,7 @@ var IPFS_DEF_SHARD_STR = IPFS_DEF_SHARD.String()
const PREFIX = "/repo/flatfs/shard/"
const SHARDING_FN = "SHARDING"
const README_FN = "README"
const README_FN = "_README"
type ShardIdV1 struct {
funName string
......@@ -121,7 +121,7 @@ func ReadShardFunc(dir string) (*ShardIdV1, error) {
}
func WriteShardFunc(dir string, id *ShardIdV1) error {
file, err := os.Create(filepath.Join(dir, SHARDING_FN))
file, err := os.OpenFile(filepath.Join(dir, SHARDING_FN), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
return err
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment