reader.go 3.75 KB
Newer Older
1 2 3 4 5 6 7
package tar

import (
	"archive/tar"
	"bytes"
	"compress/gzip"
	"io"
Matt Bell's avatar
Matt Bell committed
8
	gopath "path"
9
	"time"
10

11
	proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
12
	"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
13

14 15 16 17
	mdag "github.com/ipfs/go-ipfs/merkledag"
	path "github.com/ipfs/go-ipfs/path"
	uio "github.com/ipfs/go-ipfs/unixfs/io"
	upb "github.com/ipfs/go-ipfs/unixfs/pb"
18 19 20 21 22 23 24 25 26 27 28 29 30
)

type Reader struct {
	buf        bytes.Buffer
	closed     bool
	signalChan chan struct{}
	dag        mdag.DAGService
	resolver   *path.Resolver
	writer     *tar.Writer
	gzipWriter *gzip.Writer
	err        error
}

31
func NewReader(path path.Path, dag mdag.DAGService, dagnode *mdag.Node, compression int) (*Reader, error) {
Matt Bell's avatar
Matt Bell committed
32

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
	reader := &Reader{
		signalChan: make(chan struct{}),
		dag:        dag,
	}

	var err error
	if compression != gzip.NoCompression {
		reader.gzipWriter, err = gzip.NewWriterLevel(&reader.buf, compression)
		if err != nil {
			return nil, err
		}
		reader.writer = tar.NewWriter(reader.gzipWriter)
	} else {
		reader.writer = tar.NewWriter(&reader.buf)
	}

	// writeToBuf will write the data to the buffer, and will signal when there
	// is new data to read
Jeromy's avatar
Jeromy committed
51
	_, filename := gopath.Split(path.String())
52
	go reader.writeToBuf(dagnode, filename, 0)
53 54 55 56

	return reader, nil
}

57
func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
58 59 60
	pb := new(upb.Data)
	err := proto.Unmarshal(dagnode.Data, pb)
	if err != nil {
61
		r.emitError(err)
62 63 64 65
		return
	}

	if depth == 0 {
66
		defer r.close()
67 68 69
	}

	if pb.GetType() == upb.Data_Directory {
70
		err = r.writer.WriteHeader(&tar.Header{
71 72 73
			Name:     path,
			Typeflag: tar.TypeDir,
			Mode:     0777,
74
			ModTime:  time.Now(),
75 76 77
			// TODO: set mode, dates, etc. when added to unixFS
		})
		if err != nil {
78
			r.emitError(err)
79 80
			return
		}
81
		r.flush()
82

83 84
		ctx, cancel := context.WithTimeout(context.TODO(), time.Second*60)
		defer cancel()
85 86

		for i, ng := range r.dag.GetDAG(ctx, dagnode) {
87
			childNode, err := ng.Get(ctx)
88
			if err != nil {
89
				r.emitError(err)
90 91
				return
			}
92
			r.writeToBuf(childNode, gopath.Join(path, dagnode.Links[i].Name), depth+1)
93 94 95 96
		}
		return
	}

97
	err = r.writer.WriteHeader(&tar.Header{
98 99 100 101
		Name:     path,
		Size:     int64(pb.GetFilesize()),
		Typeflag: tar.TypeReg,
		Mode:     0644,
102
		ModTime:  time.Now(),
103 104 105
		// TODO: set mode, dates, etc. when added to unixFS
	})
	if err != nil {
106
		r.emitError(err)
107 108
		return
	}
109
	r.flush()
110

111
	reader, err := uio.NewDagReader(context.TODO(), dagnode, r.dag)
112
	if err != nil {
113
		r.emitError(err)
114 115 116
		return
	}

117
	err = r.syncCopy(reader)
118
	if err != nil {
119
		r.emitError(err)
120 121 122 123
		return
	}
}

124
func (r *Reader) Read(p []byte) (int, error) {
125 126
	// wait for the goroutine that is writing data to the buffer to tell us
	// there is something to read
127 128
	if !r.closed {
		<-r.signalChan
129 130
	}

131 132
	if r.err != nil {
		return 0, r.err
133 134
	}

135 136
	if !r.closed {
		defer r.signal()
137 138
	}

139 140
	if r.buf.Len() == 0 {
		if r.closed {
141 142 143 144 145
			return 0, io.EOF
		}
		return 0, nil
	}

146 147
	n, err := r.buf.Read(p)
	if err == io.EOF && !r.closed || r.buf.Len() > 0 {
148 149 150 151 152 153
		return n, nil
	}

	return n, err
}

154 155
func (r *Reader) signal() {
	r.signalChan <- struct{}{}
156 157
}

158 159 160
func (r *Reader) flush() {
	r.signal()
	<-r.signalChan
161 162
}

163 164 165
func (r *Reader) emitError(err error) {
	r.err = err
	r.signal()
166 167
}

168 169 170 171
func (r *Reader) close() {
	r.closed = true
	defer r.signal()
	err := r.writer.Close()
172
	if err != nil {
173
		r.emitError(err)
174 175
		return
	}
176 177
	if r.gzipWriter != nil {
		err = r.gzipWriter.Close()
178
		if err != nil {
179
			r.emitError(err)
180 181 182 183 184
			return
		}
	}
}

185
func (r *Reader) syncCopy(reader io.Reader) error {
186 187 188 189
	buf := make([]byte, 32*1024)
	for {
		nr, err := reader.Read(buf)
		if nr > 0 {
190
			_, err := r.writer.Write(buf[:nr])
191 192 193
			if err != nil {
				return err
			}
194
			r.flush()
195 196 197 198 199 200 201 202 203 204
		}
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
	}
	return nil
}