archive.go 2.61 KB
Newer Older
rht's avatar
rht committed
1 2 3 4 5
package archive

import (
	"bufio"
	"compress/gzip"
Jeromy's avatar
Jeromy committed
6
	"context"
rht's avatar
rht committed
7 8 9 10 11 12
	"io"
	"path"

	mdag "github.com/ipfs/go-ipfs/merkledag"
	tar "github.com/ipfs/go-ipfs/unixfs/archive/tar"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
Jeromy's avatar
Jeromy committed
13

Steven Allen's avatar
Steven Allen committed
14
	node "gx/ipfs/QmNwUEK7QbwSqyKBu3mMtToo8SUc6wQJ7gdZq4gGGJqfnf/go-ipld-format"
rht's avatar
rht committed
15 16 17 18 19 20
)

// DefaultBufSize is the buffer size for gets. for now, 1MB, which is ~4 blocks.
// TODO: does this need to be configurable?
var DefaultBufSize = 1048576

rht's avatar
rht committed
21 22 23 24 25 26 27 28 29 30 31 32
type identityWriteCloser struct {
	w io.Writer
}

func (i *identityWriteCloser) Write(p []byte) (int, error) {
	return i.w.Write(p)
}

func (i *identityWriteCloser) Close() error {
	return nil
}

rht's avatar
rht committed
33
// DagArchive is equivalent to `ipfs getdag $hash | maybe_tar | maybe_gzip`
Jeromy's avatar
Jeromy committed
34
func DagArchive(ctx context.Context, nd node.Node, name string, dag mdag.DAGService, archive bool, compression int) (io.Reader, error) {
rht's avatar
rht committed
35 36 37 38 39

	_, filename := path.Split(name)

	// need to connect a writer to a reader
	piper, pipew := io.Pipe()
rht's avatar
rht committed
40 41 42 43 44 45 46
	checkErrAndClosePipe := func(err error) bool {
		if err != nil {
			pipew.CloseWithError(err)
			return true
		}
		return false
	}
rht's avatar
rht committed
47 48 49 50 51

	// use a buffered writer to parallelize task
	bufw := bufio.NewWriterSize(pipew, DefaultBufSize)

	// compression determines whether to use gzip compression.
rht's avatar
rht committed
52 53 54 55 56 57 58 59
	maybeGzw, err := newMaybeGzWriter(bufw, compression)
	if checkErrAndClosePipe(err) {
		return nil, err
	}

	closeGzwAndPipe := func() {
		if err := maybeGzw.Close(); checkErrAndClosePipe(err) {
			return
rht's avatar
rht committed
60
		}
rht's avatar
rht committed
61 62 63 64
		if err := bufw.Flush(); checkErrAndClosePipe(err) {
			return
		}
		pipew.Close() // everything seems to be ok.
rht's avatar
rht committed
65 66 67 68 69
	}

	if !archive && compression != gzip.NoCompression {
		// the case when the node is a file
		dagr, err := uio.NewDagReader(ctx, nd, dag)
rht's avatar
rht committed
70
		if checkErrAndClosePipe(err) {
rht's avatar
rht committed
71 72 73 74
			return nil, err
		}

		go func() {
rht's avatar
rht committed
75
			if _, err := dagr.WriteTo(maybeGzw); checkErrAndClosePipe(err) {
rht's avatar
rht committed
76 77
				return
			}
rht's avatar
rht committed
78
			closeGzwAndPipe() // everything seems to be ok
rht's avatar
rht committed
79 80 81 82 83 84
		}()
	} else {
		// the case for 1. archive, and 2. not archived and not compressed, in which tar is used anyway as a transport format

		// construct the tar writer
		w, err := tar.NewWriter(ctx, dag, archive, compression, maybeGzw)
rht's avatar
rht committed
85
		if checkErrAndClosePipe(err) {
rht's avatar
rht committed
86 87 88 89 90
			return nil, err
		}

		go func() {
			// write all the nodes recursively
rht's avatar
rht committed
91
			if err := w.WriteNode(nd, filename); checkErrAndClosePipe(err) {
rht's avatar
rht committed
92 93
				return
			}
rht's avatar
rht committed
94 95
			w.Close()         // close tar writer
			closeGzwAndPipe() // everything seems to be ok
rht's avatar
rht committed
96 97 98 99 100
		}()
	}

	return piper, nil
}
rht's avatar
rht committed
101 102 103 104 105 106 107

func newMaybeGzWriter(w io.Writer, compression int) (io.WriteCloser, error) {
	if compression != gzip.NoCompression {
		return gzip.NewWriterLevel(w, compression)
	}
	return &identityWriteCloser{w}, nil
}