Commit 2de21bc0 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #3653 from ipfs/kevina/filestore-util

Basic Filestore Utilties
parents a3fc6f18 1eddb60f
package commands
import (
"context"
"fmt"
cmds "github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/filestore"
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
u "gx/ipfs/QmZuY8aV7zbNXVy6DyN9SmnuH3o9nG852F4aTiSBpts8d1/go-ipfs-util"
)
var FileStoreCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Interact with filestore objects.",
},
Subcommands: map[string]*cmds.Command{
"ls": lsFileStore,
"verify": verifyFileStore,
"dups": dupsFileStore,
},
}
var lsFileStore = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List objects in filestore.",
LongDescription: `
List objects in the filestore.
If one or more <obj> is specified only list those specific objects,
otherwise list all objects.
The output is:
<hash> <size> <path> <offset>
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("obj", false, true, "Cid of objects to list."),
},
Run: func(req cmds.Request, res cmds.Response) {
_, fs, err := getFilestore(req)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
args := req.Arguments()
if len(args) > 0 {
out := perKeyActionToChan(args, func(c *cid.Cid) *filestore.ListRes {
return filestore.List(fs, c)
}, req.Context())
res.SetOutput(out)
} else {
next, err := filestore.ListAll(fs)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
out := listResToChan(next, req.Context())
res.SetOutput(out)
}
},
PostRun: func(req cmds.Request, res cmds.Response) {
if res.Error() != nil {
return
}
outChan, ok := res.Output().(<-chan interface{})
if !ok {
res.SetError(u.ErrCast(), cmds.ErrNormal)
return
}
res.SetOutput(nil)
errors := false
for r0 := range outChan {
r := r0.(*filestore.ListRes)
if r.ErrorMsg != "" {
errors = true
fmt.Fprintf(res.Stderr(), "%s\n", r.ErrorMsg)
} else {
fmt.Fprintf(res.Stdout(), "%s\n", r.FormatLong())
}
}
if errors {
res.SetError(fmt.Errorf("errors while displaying some entries"), cmds.ErrNormal)
}
},
Type: filestore.ListRes{},
}
var verifyFileStore = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Verify objects in filestore.",
LongDescription: `
Verify objects in the filestore.
If one or more <obj> is specified only verify those specific objects,
otherwise verify all objects.
The output is:
<status> <hash> <size> <path> <offset>
Where <status> is one of:
ok: the block can be reconstructed
changed: the contents of the backing file have changed
no-file: the backing file could not be found
error: there was some other problem reading the file
missing: <obj> could not be found in the filestore
ERROR: internal error, most likely due to a corrupt database
For ERROR entries the error will also be printed to stderr.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("obj", false, true, "Cid of objects to verify."),
},
Run: func(req cmds.Request, res cmds.Response) {
_, fs, err := getFilestore(req)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
args := req.Arguments()
if len(args) > 0 {
out := perKeyActionToChan(args, func(c *cid.Cid) *filestore.ListRes {
return filestore.Verify(fs, c)
}, req.Context())
res.SetOutput(out)
} else {
next, err := filestore.VerifyAll(fs)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
out := listResToChan(next, req.Context())
res.SetOutput(out)
}
},
PostRun: func(req cmds.Request, res cmds.Response) {
if res.Error() != nil {
return
}
outChan, ok := res.Output().(<-chan interface{})
if !ok {
res.SetError(u.ErrCast(), cmds.ErrNormal)
return
}
res.SetOutput(nil)
for r0 := range outChan {
r := r0.(*filestore.ListRes)
if r.Status == filestore.StatusOtherError {
fmt.Fprintf(res.Stderr(), "%s\n", r.ErrorMsg)
}
fmt.Fprintf(res.Stdout(), "%s %s\n", r.Status.Format(), r.FormatLong())
}
},
Type: filestore.ListRes{},
}
var dupsFileStore = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List blocks that are both in the filestore and standard block storage.",
},
Run: func(req cmds.Request, res cmds.Response) {
_, fs, err := getFilestore(req)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
ch, err := fs.FileManager().AllKeysChan(req.Context())
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
out := make(chan interface{}, 128)
res.SetOutput((<-chan interface{})(out))
go func() {
defer close(out)
for cid := range ch {
have, err := fs.MainBlockstore().Has(cid)
if err != nil {
out <- &RefWrapper{Err: err.Error()}
return
}
if have {
out <- &RefWrapper{Ref: cid.String()}
}
}
}()
},
Marshalers: refsMarshallerMap,
Type: RefWrapper{},
}
func getFilestore(req cmds.Request) (*core.IpfsNode, *filestore.Filestore, error) {
n, err := req.InvocContext().GetNode()
if err != nil {
return nil, nil, err
}
fs := n.Filestore
if fs == nil {
return n, nil, fmt.Errorf("filestore not enabled")
}
return n, fs, err
}
func listResToChan(next func() *filestore.ListRes, ctx context.Context) <-chan interface{} {
out := make(chan interface{}, 128)
go func() {
defer close(out)
for {
r := next()
if r == nil {
return
}
select {
case out <- r:
case <-ctx.Done():
return
}
}
}()
return out
}
func perKeyActionToChan(args []string, action func(*cid.Cid) *filestore.ListRes, ctx context.Context) <-chan interface{} {
out := make(chan interface{}, 128)
go func() {
defer close(out)
for _, arg := range args {
c, err := cid.Decode(arg)
if err != nil {
out <- &filestore.ListRes{
Status: filestore.StatusOtherError,
ErrorMsg: fmt.Sprintf("%s: %v", arg, err),
}
continue
}
r := action(c)
select {
case out <- r:
case <-ctx.Done():
return
}
}
}()
return out
}
......@@ -47,6 +47,7 @@ ADVANCED COMMANDS
pin Pin objects to local storage
repo Manipulate the IPFS repository
stats Various operational stats
filestore Manage the filestore (experimental)
NETWORK COMMANDS
id Show info about IPFS peers
......@@ -124,6 +125,7 @@ var rootSubcommands = map[string]*cmds.Command{
"update": ExternalBinary(),
"version": VersionCmd,
"bitswap": BitswapCmd,
"filestore": FileStoreCmd,
}
// RootRO is the readonly version of Root
......
......@@ -19,6 +19,14 @@ type Filestore struct {
bs blockstore.Blockstore
}
func (f *Filestore) FileManager() *FileManager {
return f.fm
}
func (f *Filestore) MainBlockstore() blockstore.Blockstore {
return f.bs
}
func NewFilestore(bs blockstore.Blockstore, fm *FileManager) *Filestore {
return &Filestore{fm, bs}
}
......
......@@ -28,7 +28,8 @@ type FileManager struct {
}
type CorruptReferenceError struct {
Err error
Code Status
Err error
}
func (c CorruptReferenceError) Error() string {
......@@ -108,6 +109,10 @@ func (f *FileManager) getDataObj(c *cid.Cid) (*pb.DataObj, error) {
//
}
return unmarshalDataObj(o)
}
func unmarshalDataObj(o interface{}) (*pb.DataObj, error) {
data, ok := o.([]byte)
if !ok {
return nil, fmt.Errorf("stored filestore dataobj was not a []byte")
......@@ -127,20 +132,24 @@ func (f *FileManager) readDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
abspath := filepath.Join(f.root, p)
fi, err := os.Open(abspath)
if err != nil {
return nil, &CorruptReferenceError{err}
if os.IsNotExist(err) {
return nil, &CorruptReferenceError{StatusFileNotFound, err}
} else if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
defer fi.Close()
_, err = fi.Seek(int64(d.GetOffset()), os.SEEK_SET)
if err != nil {
return nil, &CorruptReferenceError{err}
return nil, &CorruptReferenceError{StatusFileError, err}
}
outbuf := make([]byte, d.GetSize_())
_, err = io.ReadFull(fi, outbuf)
if err != nil {
return nil, &CorruptReferenceError{err}
if err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, &CorruptReferenceError{StatusFileChanged, err}
} else if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
outcid, err := c.Prefix().Sum(outbuf)
......@@ -149,7 +158,8 @@ func (f *FileManager) readDataObj(c *cid.Cid, d *pb.DataObj) ([]byte, error) {
}
if !c.Equals(outcid) {
return nil, &CorruptReferenceError{fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
return nil, &CorruptReferenceError{StatusFileChanged,
fmt.Errorf("data in file did not match. %s offset %d", d.GetFilePath(), d.GetOffset())}
}
return outbuf, nil
......
package filestore
import (
"fmt"
"github.com/ipfs/go-ipfs/blocks/blockstore"
pb "github.com/ipfs/go-ipfs/filestore/pb"
dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
dsq "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query"
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
)
type Status int32
const (
StatusOk Status = 0
StatusFileError Status = 10 // Backing File Error
StatusFileNotFound Status = 11 // Backing File Not Found
StatusFileChanged Status = 12 // Contents of the file changed
StatusOtherError Status = 20 // Internal Error, likely corrupt entry
StatusKeyNotFound Status = 30
)
func (s Status) String() string {
switch s {
case StatusOk:
return "ok"
case StatusFileError:
return "error"
case StatusFileNotFound:
return "no-file"
case StatusFileChanged:
return "changed"
case StatusOtherError:
return "ERROR"
case StatusKeyNotFound:
return "missing"
default:
return "???"
}
}
func (s Status) Format() string {
return fmt.Sprintf("%-7s", s.String())
}
type ListRes struct {
Status Status
ErrorMsg string
Key *cid.Cid
FilePath string
Offset uint64
Size uint64
}
func (r *ListRes) FormatLong() string {
switch {
case r.Key == nil:
return "<corrupt key>"
case r.FilePath == "":
return r.Key.String()
default:
return fmt.Sprintf("%-50s %6d %s %d", r.Key, r.Size, r.FilePath, r.Offset)
}
}
func List(fs *Filestore, key *cid.Cid) *ListRes {
return list(fs, false, key)
}
func ListAll(fs *Filestore) (func() *ListRes, error) {
return listAll(fs, false)
}
func Verify(fs *Filestore, key *cid.Cid) *ListRes {
return list(fs, true, key)
}
func VerifyAll(fs *Filestore) (func() *ListRes, error) {
return listAll(fs, true)
}
func list(fs *Filestore, verify bool, key *cid.Cid) *ListRes {
dobj, err := fs.fm.getDataObj(key)
if err != nil {
return mkListRes(key, nil, err)
}
if verify {
_, err = fs.fm.readDataObj(key, dobj)
}
return mkListRes(key, dobj, err)
}
func listAll(fs *Filestore, verify bool) (func() *ListRes, error) {
q := dsq.Query{}
qr, err := fs.fm.ds.Query(q)
if err != nil {
return nil, err
}
return func() *ListRes {
cid, dobj, err := next(qr)
if dobj == nil && err == nil {
return nil
} else if err == nil && verify {
_, err = fs.fm.readDataObj(cid, dobj)
}
return mkListRes(cid, dobj, err)
}, nil
}
func next(qr dsq.Results) (*cid.Cid, *pb.DataObj, error) {
v, ok := qr.NextSync()
if !ok {
return nil, nil, nil
}
k := ds.RawKey(v.Key)
c, err := dshelp.DsKeyToCid(k)
if err != nil {
return nil, nil, fmt.Errorf("decoding cid from filestore: %s", err)
}
dobj, err := unmarshalDataObj(v.Value)
if err != nil {
return c, nil, err
}
return c, dobj, nil
}
func mkListRes(c *cid.Cid, d *pb.DataObj, err error) *ListRes {
status := StatusOk
errorMsg := ""
if err != nil {
if err == ds.ErrNotFound || err == blockstore.ErrNotFound {
status = StatusKeyNotFound
} else if err, ok := err.(*CorruptReferenceError); ok {
status = err.Code
} else {
status = StatusOtherError
}
errorMsg = err.Error()
}
if d == nil {
return &ListRes{
Status: status,
ErrorMsg: errorMsg,
Key: c,
}
} else {
return &ListRes{
Status: status,
ErrorMsg: errorMsg,
Key: c,
FilePath: *d.FilePath,
Size: *d.Size_,
Offset: *d.Offset,
}
}
}
#!/bin/sh
#
# Copyright (c) 2017 Jeromy Johnson
# MIT Licensed; see the LICENSE file in this repository.
#
test_description="Test out the filestore nocopy functionality"
. lib/test-lib.sh
test_init_filestore() {
test_expect_success "clean up old node" '
rm -rf "$IPFS_PATH" mountdir ipfs ipns
'
test_init_ipfs
test_expect_success "enable filestore config setting" '
ipfs config --json Experimental.FilestoreEnabled true
'
}
test_init_dataset() {
test_expect_success "create a dataset" '
rm -r somedir
mkdir somedir &&
random 1000 1 > somedir/file1 &&
random 10000 2 > somedir/file2 &&
random 1000000 3 > somedir/file3
'
}
test_init() {
test_init_filestore
test_init_dataset
}
EXPHASH="QmRueCuPMYYvdxWz1vWncF7wzCScEx4qasZXo5aVBb1R4V"
cat <<EOF > ls_expect
zb2rhaPkR7ZF9BzSC2BfqbcGivi9QMdauermW9YB6NvS7FZMo 10000 somedir/file2 0
zb2rhav4wcdvNXtaKDTWHYAqtUHMEpygT1cxqMsfK7QrDuHxH 262144 somedir/file3 524288
zb2rhbcZ3aUXYcrbhhDH1JyrpDcpdw1KFJ5Xs5covjnvMpxDR 1000 somedir/file1 0
zb2rhe28UqCDm7TFib7PRyQYEkvuq8iahcXA2AbgaxCLvNhfk 262144 somedir/file3 0
zb2rhebtyTTuHKyTbJPnkDUSruU5Uma4DN8t2EkvYZ6fP36mm 262144 somedir/file3 262144
zb2rhm9VTrX2mfatggYUk8mHLz78XBxVUTTzLvM2N3d6frdAU 213568 somedir/file3 786432
EOF
FILE1_HASH=zb2rhbcZ3aUXYcrbhhDH1JyrpDcpdw1KFJ5Xs5covjnvMpxDR
FILE2_HASH=zb2rhaPkR7ZF9BzSC2BfqbcGivi9QMdauermW9YB6NvS7FZMo
FILE3_HASH=QmfE4SDQazxTD7u8VTYs9AJqQL8rrJPUAorLeJXKSZrVf9
cat <<EOF > verify_expect
ok zb2rhaPkR7ZF9BzSC2BfqbcGivi9QMdauermW9YB6NvS7FZMo 10000 somedir/file2 0
ok zb2rhav4wcdvNXtaKDTWHYAqtUHMEpygT1cxqMsfK7QrDuHxH 262144 somedir/file3 524288
ok zb2rhbcZ3aUXYcrbhhDH1JyrpDcpdw1KFJ5Xs5covjnvMpxDR 1000 somedir/file1 0
ok zb2rhe28UqCDm7TFib7PRyQYEkvuq8iahcXA2AbgaxCLvNhfk 262144 somedir/file3 0
ok zb2rhebtyTTuHKyTbJPnkDUSruU5Uma4DN8t2EkvYZ6fP36mm 262144 somedir/file3 262144
ok zb2rhm9VTrX2mfatggYUk8mHLz78XBxVUTTzLvM2N3d6frdAU 213568 somedir/file3 786432
EOF
test_filestore_adds() {
test_expect_success "nocopy add succeeds" '
HASH=$(ipfs add --raw-leaves --nocopy -r -q somedir | tail -n1)
'
test_expect_success "nocopy add has right hash" '
test "$HASH" = "$EXPHASH"
'
test_expect_success "'ipfs filestore ls' output looks good'" '
ipfs filestore ls | sort > ls_actual &&
test_cmp ls_expect ls_actual
'
test_expect_success "'ipfs filestore ls HASH' works" '
ipfs filestore ls $FILE1_HASH > ls_actual &&
grep -q somedir/file1 ls_actual
'
test_expect_success "can retrieve multi-block file" '
ipfs cat $FILE3_HASH > file3.data &&
test_cmp somedir/file3 file3.data
'
}
# check that the filestore is in a clean state
test_filestore_state() {
test_expect_success "ipfs filestore verify' output looks good'" '
ipfs filestore verify | LC_ALL=C sort > verify_actual
test_cmp verify_expect verify_actual
'
}
test_filestore_verify() {
test_filestore_state
test_expect_success "'ipfs filestore verify HASH' works" '
ipfs filestore verify $FILE1_HASH > verify_actual &&
grep -q somedir/file1 verify_actual
'
test_expect_success "rename a file" '
mv somedir/file1 somedir/file1.bk
'
test_expect_success "can not retrieve block after backing file moved" '
test_must_fail ipfs cat $FILE1_HASH
'
test_expect_success "'ipfs filestore verify' shows file as missing" '
ipfs filestore verify > verify_actual &&
grep no-file verify_actual | grep -q somedir/file1
'
test_expect_success "move file back" '
mv somedir/file1.bk somedir/file1
'
test_expect_success "block okay now" '
ipfs cat $FILE1_HASH > file1.data &&
test_cmp somedir/file1 file1.data
'
test_expect_success "change first bit of file" '
dd if=/dev/zero of=somedir/file3 bs=1024 count=1
'
test_expect_success "can not retrieve block after backing file changed" '
test_must_fail ipfs cat $FILE3_HASH
'
test_expect_success "'ipfs filestore verify' shows file as changed" '
ipfs filestore verify > verify_actual &&
grep changed verify_actual | grep -q somedir/file3
'
# reset the state for the next test
test_init_dataset
}
test_filestore_dups() {
# make sure the filestore is in a clean state
test_filestore_state
test_expect_success "'ipfs filestore dups'" '
ipfs add --raw-leaves somedir/file1 &&
ipfs filestore dups > dups_actual &&
echo "$FILE1_HASH" > dups_expect
test_cmp dups_expect dups_actual
'
}
#
# No daemon
#
test_init
test_filestore_adds
test_filestore_verify
test_filestore_dups
#
# With daemon
#
test_init
# must be in offline mode so tests that retrieve non-existent blocks
# doesn't hang
test_launch_ipfs_daemon --offline
test_filestore_adds
test_filestore_verify
test_filestore_dups
test_kill_ipfs_daemon
test_done
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment