Commit 11a66b3e authored by Juan Benet's avatar Juan Benet

Merge pull request #1558 from rht/cleanup-get

Refactor ipfs get
parents 27e68406 9f0c8134
package commands
import (
"bufio"
"compress/gzip"
"errors"
"fmt"
......@@ -11,14 +10,12 @@ import (
"strings"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
path "github.com/ipfs/go-ipfs/path"
tar "github.com/ipfs/go-ipfs/thirdparty/tar"
uio "github.com/ipfs/go-ipfs/unixfs/io"
utar "github.com/ipfs/go-ipfs/unixfs/tar"
uarchive "github.com/ipfs/go-ipfs/unixfs/archive"
)
var ErrInvalidCompressionLevel = errors.New("Compression level must be between 1 and 9")
......@@ -64,15 +61,16 @@ may also specify the level of compression by specifying '-l=<1-9>'.
res.SetError(err, cmds.ErrNormal)
return
}
p := path.Path(req.Arguments()[0])
var reader io.Reader
if archive, _, _ := req.Option("archive").Bool(); !archive && cmplvl != gzip.NoCompression {
// only use this when the flag is '-C' without '-a'
reader, err = getZip(req.Context(), node, p, cmplvl)
} else {
reader, err = get(req.Context(), node, p, cmplvl)
ctx := req.Context()
dn, err := core.Resolve(ctx, node, p)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
archive, _, _ := req.Option("archive").Bool()
reader, err := uarchive.DagArchive(ctx, dn, p.String(), node.DAG, archive, cmplvl)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
......@@ -192,42 +190,3 @@ func getCompressOptions(req cmds.Request) (int, error) {
}
return gzip.NoCompression, nil
}
func get(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) {
dn, err := core.Resolve(ctx, node, p)
if err != nil {
return nil, err
}
return utar.DagArchive(ctx, dn, p.String(), node.DAG, compression)
}
// getZip is equivalent to `ipfs getdag $hash | gzip`
func getZip(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) {
dagnode, err := core.Resolve(ctx, node, p)
if err != nil {
return nil, err
}
reader, err := uio.NewDagReader(ctx, dagnode, node.DAG)
if err != nil {
return nil, err
}
pr, pw := io.Pipe()
gw, err := gzip.NewWriterLevel(pw, compression)
if err != nil {
return nil, err
}
bufin := bufio.NewReader(reader)
go func() {
_, err := bufin.WriteTo(gw)
if err != nil {
log.Error("Fail to compress the stream")
}
gw.Close()
pw.Close()
}()
return pr, nil
}
......@@ -180,7 +180,7 @@ func (s *Node) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadR
return err
}
buf := resp.Data[:min(req.Size, int(r.Size()-req.Offset))]
buf := resp.Data[:min(req.Size, int(int64(r.Size())-req.Offset))]
n, err := io.ReadFull(r, buf)
if err != nil && err != io.EOF {
return err
......
package archive
import (
"bufio"
"compress/gzip"
"io"
"path"
cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
mdag "github.com/ipfs/go-ipfs/merkledag"
tar "github.com/ipfs/go-ipfs/unixfs/archive/tar"
uio "github.com/ipfs/go-ipfs/unixfs/io"
)
// DefaultBufSize is the buffer size for gets. for now, 1MB, which is ~4 blocks.
// TODO: does this need to be configurable?
var DefaultBufSize = 1048576
// DagArchive is equivalent to `ipfs getdag $hash | maybe_tar | maybe_gzip`
func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService, archive bool, compression int) (io.Reader, error) {
_, filename := path.Split(name)
// need to connect a writer to a reader
piper, pipew := io.Pipe()
// use a buffered writer to parallelize task
bufw := bufio.NewWriterSize(pipew, DefaultBufSize)
// compression determines whether to use gzip compression.
var maybeGzw io.Writer
if compression != gzip.NoCompression {
var err error
maybeGzw, err = gzip.NewWriterLevel(bufw, compression)
if err != nil {
return nil, err
}
} else {
maybeGzw = bufw
}
if !archive && compression != gzip.NoCompression {
// the case when the node is a file
dagr, err := uio.NewDagReader(ctx, nd, dag)
if err != nil {
pipew.CloseWithError(err)
return nil, err
}
go func() {
if _, err := dagr.WriteTo(maybeGzw); err != nil {
pipew.CloseWithError(err)
return
}
pipew.Close() // everything seems to be ok.
}()
} else {
// the case for 1. archive, and 2. not archived and not compressed, in which tar is used anyway as a transport format
// construct the tar writer
w, err := tar.NewWriter(ctx, dag, archive, compression, maybeGzw)
if err != nil {
return nil, err
}
go func() {
// write all the nodes recursively
if err := w.WriteNode(nd, filename); err != nil {
pipew.CloseWithError(err)
return
}
if err := bufw.Flush(); err != nil {
pipew.CloseWithError(err)
return
}
w.Close()
pipew.Close() // everything seems to be ok.
}()
}
return piper, nil
}
......@@ -2,9 +2,6 @@ package tar
import (
"archive/tar"
"bufio"
"compress/gzip"
"fmt"
"io"
"path"
"time"
......@@ -13,87 +10,43 @@ import (
cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
mdag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
uio "github.com/ipfs/go-ipfs/unixfs/io"
upb "github.com/ipfs/go-ipfs/unixfs/pb"
)
// DefaultBufSize is the buffer size for gets. for now, 1MB, which is ~4 blocks.
// TODO: does this need to be configurable?
var DefaultBufSize = 1048576
func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService, compression int) (io.Reader, error) {
_, filename := path.Split(name)
// need to connect a writer to a reader
piper, pipew := io.Pipe()
// use a buffered writer to parallelize task
bufw := bufio.NewWriterSize(pipew, DefaultBufSize)
// construct the tar writer
w, err := NewWriter(bufw, dag, compression)
if err != nil {
return nil, err
}
// write all the nodes recursively
go func() {
if err := w.WriteNode(ctx, nd, filename); err != nil {
pipew.CloseWithError(err)
return
}
if err := bufw.Flush(); err != nil {
pipew.CloseWithError(err)
return
}
pipew.Close() // everything seems to be ok.
}()
return piper, nil
}
// Writer is a utility structure that helps to write
// unixfs merkledag nodes as a tar archive format.
// It wraps any io.Writer.
type Writer struct {
Dag mdag.DAGService
TarW *tar.Writer
ctx cxt.Context
}
// NewWriter wraps given io.Writer.
// compression determines whether to use gzip compression.
func NewWriter(w io.Writer, dag mdag.DAGService, compression int) (*Writer, error) {
if compression != gzip.NoCompression {
var err error
w, err = gzip.NewWriterLevel(w, compression)
if err != nil {
return nil, err
}
}
func NewWriter(ctx cxt.Context, dag mdag.DAGService, archive bool, compression int, w io.Writer) (*Writer, error) {
return &Writer{
Dag: dag,
TarW: tar.NewWriter(w),
ctx: ctx,
}, nil
}
func (w *Writer) WriteDir(ctx cxt.Context, nd *mdag.Node, fpath string) error {
func (w *Writer) writeDir(nd *mdag.Node, fpath string) error {
if err := writeDirHeader(w.TarW, fpath); err != nil {
return err
}
for i, ng := range w.Dag.GetDAG(ctx, nd) {
child, err := ng.Get(ctx)
for i, ng := range w.Dag.GetDAG(w.ctx, nd) {
child, err := ng.Get(w.ctx)
if err != nil {
return err
}
npath := path.Join(fpath, nd.Links[i].Name)
if err := w.WriteNode(ctx, child, npath); err != nil {
if err := w.WriteNode(child, npath); err != nil {
return err
}
}
......@@ -101,46 +54,33 @@ func (w *Writer) WriteDir(ctx cxt.Context, nd *mdag.Node, fpath string) error {
return nil
}
func (w *Writer) WriteFile(ctx cxt.Context, nd *mdag.Node, fpath string) error {
pb := new(upb.Data)
if err := proto.Unmarshal(nd.Data, pb); err != nil {
return err
}
return w.writeFile(ctx, nd, pb, fpath)
}
func (w *Writer) writeFile(ctx cxt.Context, nd *mdag.Node, pb *upb.Data, fpath string) error {
func (w *Writer) writeFile(nd *mdag.Node, pb *upb.Data, fpath string) error {
if err := writeFileHeader(w.TarW, fpath, pb.GetFilesize()); err != nil {
return err
}
dagr, err := uio.NewDagReader(ctx, nd, w.Dag)
if err != nil {
return err
}
_, err = io.Copy(w.TarW, dagr)
if err != nil && err != io.EOF {
return err
}
return nil
dagr := uio.NewDataFileReader(w.ctx, nd, pb, w.Dag)
_, err := dagr.WriteTo(w.TarW)
return err
}
func (w *Writer) WriteNode(ctx cxt.Context, nd *mdag.Node, fpath string) error {
func (w *Writer) WriteNode(nd *mdag.Node, fpath string) error {
pb := new(upb.Data)
if err := proto.Unmarshal(nd.Data, pb); err != nil {
return err
}
switch pb.GetType() {
case upb.Data_Metadata:
fallthrough
case upb.Data_Directory:
return w.WriteDir(ctx, nd, fpath)
return w.writeDir(nd, fpath)
case upb.Data_Raw:
fallthrough
case upb.Data_File:
return w.writeFile(ctx, nd, pb, fpath)
return w.writeFile(nd, pb, fpath)
default:
return fmt.Errorf("unixfs type not supported: %s", pb.GetType())
return ft.ErrUnrecognizedType
}
}
......
......@@ -58,8 +58,7 @@ type ReadSeekCloser interface {
// node, using the passed in DAGService for data retreival
func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*DagReader, error) {
pb := new(ftpb.Data)
err := proto.Unmarshal(n.Data, pb)
if err != nil {
if err := proto.Unmarshal(n.Data, pb); err != nil {
return nil, err
}
......@@ -70,7 +69,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag
case ftpb.Data_Raw:
fallthrough
case ftpb.Data_File:
return newDataFileReader(ctx, n, pb, serv), nil
return NewDataFileReader(ctx, n, pb, serv), nil
case ftpb.Data_Metadata:
if len(n.Links) == 0 {
return nil, errors.New("incorrectly formatted metadata object")
......@@ -85,7 +84,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag
}
}
func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
func NewDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
fctx, cancel := context.WithCancel(ctx)
promises := serv.GetDAG(fctx, n)
return &DagReader{
......@@ -124,7 +123,7 @@ func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
// A directory should not exist within a file
return ft.ErrInvalidDirLocation
case ftpb.Data_File:
dr.buf = newDataFileReader(dr.ctx, nxt, pb, dr.serv)
dr.buf = NewDataFileReader(dr.ctx, nxt, pb, dr.serv)
return nil
case ftpb.Data_Raw:
dr.buf = NewRSNCFromBytes(pb.GetData())
......@@ -137,8 +136,8 @@ func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
}
// Size return the total length of the data from the DAG structured file.
func (dr *DagReader) Size() int64 {
return int64(dr.pbdata.GetFilesize())
func (dr *DagReader) Size() uint64 {
return dr.pbdata.GetFilesize()
}
// Read reads data from the DAG structured file
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment