archive.go 2.68 KB
Newer Older
Hector Sanjuan's avatar
Hector Sanjuan committed
1
// Package archive provides utilities to archive and compress a [Unixfs] DAG.
rht's avatar
rht committed
2 3 4 5 6
package archive

import (
	"bufio"
	"compress/gzip"
Jeromy's avatar
Jeromy committed
7
	"context"
rht's avatar
rht committed
8 9 10 11 12
	"io"
	"path"

	tar "github.com/ipfs/go-ipfs/unixfs/archive/tar"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
Jeromy's avatar
Jeromy committed
13

14
	ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
rht's avatar
rht committed
15 16 17 18 19 20
)

// DefaultBufSize is the buffer size for gets. for now, 1MB, which is ~4 blocks.
// TODO: does this need to be configurable?
var DefaultBufSize = 1048576

rht's avatar
rht committed
21 22 23 24 25 26 27 28 29 30 31 32
type identityWriteCloser struct {
	w io.Writer
}

func (i *identityWriteCloser) Write(p []byte) (int, error) {
	return i.w.Write(p)
}

func (i *identityWriteCloser) Close() error {
	return nil
}

rht's avatar
rht committed
33
// DagArchive is equivalent to `ipfs getdag $hash | maybe_tar | maybe_gzip`
34
func DagArchive(ctx context.Context, nd ipld.Node, name string, dag ipld.DAGService, archive bool, compression int) (io.Reader, error) {
rht's avatar
rht committed
35

Lucas Molas's avatar
Lucas Molas committed
36 37
	cleaned := path.Clean(name)
	_, filename := path.Split(cleaned)
rht's avatar
rht committed
38 39 40

	// need to connect a writer to a reader
	piper, pipew := io.Pipe()
rht's avatar
rht committed
41 42 43 44 45 46 47
	checkErrAndClosePipe := func(err error) bool {
		if err != nil {
			pipew.CloseWithError(err)
			return true
		}
		return false
	}
rht's avatar
rht committed
48 49 50 51 52

	// use a buffered writer to parallelize task
	bufw := bufio.NewWriterSize(pipew, DefaultBufSize)

	// compression determines whether to use gzip compression.
rht's avatar
rht committed
53 54 55 56 57 58 59 60
	maybeGzw, err := newMaybeGzWriter(bufw, compression)
	if checkErrAndClosePipe(err) {
		return nil, err
	}

	closeGzwAndPipe := func() {
		if err := maybeGzw.Close(); checkErrAndClosePipe(err) {
			return
rht's avatar
rht committed
61
		}
rht's avatar
rht committed
62 63 64 65
		if err := bufw.Flush(); checkErrAndClosePipe(err) {
			return
		}
		pipew.Close() // everything seems to be ok.
rht's avatar
rht committed
66 67 68 69 70
	}

	if !archive && compression != gzip.NoCompression {
		// the case when the node is a file
		dagr, err := uio.NewDagReader(ctx, nd, dag)
rht's avatar
rht committed
71
		if checkErrAndClosePipe(err) {
rht's avatar
rht committed
72 73 74 75
			return nil, err
		}

		go func() {
rht's avatar
rht committed
76
			if _, err := dagr.WriteTo(maybeGzw); checkErrAndClosePipe(err) {
rht's avatar
rht committed
77 78
				return
			}
rht's avatar
rht committed
79
			closeGzwAndPipe() // everything seems to be ok
rht's avatar
rht committed
80 81 82 83 84 85
		}()
	} else {
		// the case for 1. archive, and 2. not archived and not compressed, in which tar is used anyway as a transport format

		// construct the tar writer
		w, err := tar.NewWriter(ctx, dag, archive, compression, maybeGzw)
rht's avatar
rht committed
86
		if checkErrAndClosePipe(err) {
rht's avatar
rht committed
87 88 89 90 91
			return nil, err
		}

		go func() {
			// write all the nodes recursively
rht's avatar
rht committed
92
			if err := w.WriteNode(nd, filename); checkErrAndClosePipe(err) {
rht's avatar
rht committed
93 94
				return
			}
rht's avatar
rht committed
95 96
			w.Close()         // close tar writer
			closeGzwAndPipe() // everything seems to be ok
rht's avatar
rht committed
97 98 99 100 101
		}()
	}

	return piper, nil
}
rht's avatar
rht committed
102 103 104 105 106 107 108

func newMaybeGzWriter(w io.Writer, compression int) (io.WriteCloser, error) {
	if compression != gzip.NoCompression {
		return gzip.NewWriterLevel(w, compression)
	}
	return &identityWriteCloser{w}, nil
}