archive.go 2.57 KB
Newer Older
rht's avatar
rht committed
1 2 3 4 5
package archive

import (
	"bufio"
	"compress/gzip"
Jeromy's avatar
Jeromy committed
6
	"context"
rht's avatar
rht committed
7 8 9 10 11
	"io"
	"path"

	tar "github.com/ipfs/go-ipfs/unixfs/archive/tar"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
Jeromy's avatar
Jeromy committed
12

Steven Allen's avatar
Steven Allen committed
13
	node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
rht's avatar
rht committed
14 15 16 17 18 19
)

// DefaultBufSize is the buffer size for gets. for now, 1MB, which is ~4 blocks.
// TODO: does this need to be configurable?
var DefaultBufSize = 1048576

rht's avatar
rht committed
20 21 22 23 24 25 26 27 28 29 30 31
type identityWriteCloser struct {
	w io.Writer
}

func (i *identityWriteCloser) Write(p []byte) (int, error) {
	return i.w.Write(p)
}

func (i *identityWriteCloser) Close() error {
	return nil
}

rht's avatar
rht committed
32
// DagArchive is equivalent to `ipfs getdag $hash | maybe_tar | maybe_gzip`
33
func DagArchive(ctx context.Context, nd node.Node, name string, dag node.DAGService, archive bool, compression int) (io.Reader, error) {
rht's avatar
rht committed
34 35 36 37 38

	_, filename := path.Split(name)

	// need to connect a writer to a reader
	piper, pipew := io.Pipe()
rht's avatar
rht committed
39 40 41 42 43 44 45
	checkErrAndClosePipe := func(err error) bool {
		if err != nil {
			pipew.CloseWithError(err)
			return true
		}
		return false
	}
rht's avatar
rht committed
46 47 48 49 50

	// use a buffered writer to parallelize task
	bufw := bufio.NewWriterSize(pipew, DefaultBufSize)

	// compression determines whether to use gzip compression.
rht's avatar
rht committed
51 52 53 54 55 56 57 58
	maybeGzw, err := newMaybeGzWriter(bufw, compression)
	if checkErrAndClosePipe(err) {
		return nil, err
	}

	closeGzwAndPipe := func() {
		if err := maybeGzw.Close(); checkErrAndClosePipe(err) {
			return
rht's avatar
rht committed
59
		}
rht's avatar
rht committed
60 61 62 63
		if err := bufw.Flush(); checkErrAndClosePipe(err) {
			return
		}
		pipew.Close() // everything seems to be ok.
rht's avatar
rht committed
64 65 66 67 68
	}

	if !archive && compression != gzip.NoCompression {
		// the case when the node is a file
		dagr, err := uio.NewDagReader(ctx, nd, dag)
rht's avatar
rht committed
69
		if checkErrAndClosePipe(err) {
rht's avatar
rht committed
70 71 72 73
			return nil, err
		}

		go func() {
rht's avatar
rht committed
74
			if _, err := dagr.WriteTo(maybeGzw); checkErrAndClosePipe(err) {
rht's avatar
rht committed
75 76
				return
			}
rht's avatar
rht committed
77
			closeGzwAndPipe() // everything seems to be ok
rht's avatar
rht committed
78 79 80 81 82 83
		}()
	} else {
		// the case for 1. archive, and 2. not archived and not compressed, in which tar is used anyway as a transport format

		// construct the tar writer
		w, err := tar.NewWriter(ctx, dag, archive, compression, maybeGzw)
rht's avatar
rht committed
84
		if checkErrAndClosePipe(err) {
rht's avatar
rht committed
85 86 87 88 89
			return nil, err
		}

		go func() {
			// write all the nodes recursively
rht's avatar
rht committed
90
			if err := w.WriteNode(nd, filename); checkErrAndClosePipe(err) {
rht's avatar
rht committed
91 92
				return
			}
rht's avatar
rht committed
93 94
			w.Close()         // close tar writer
			closeGzwAndPipe() // everything seems to be ok
rht's avatar
rht committed
95 96 97 98 99
		}()
	}

	return piper, nil
}
rht's avatar
rht committed
100 101 102 103 104 105 106

func newMaybeGzWriter(w io.Writer, compression int) (io.WriteCloser, error) {
	if compression != gzip.NoCompression {
		return gzip.NewWriterLevel(w, compression)
	}
	return &identityWriteCloser{w}, nil
}