reader.go 3.85 KB
Newer Older
1 2 3 4 5 6 7
package tar

import (
	"archive/tar"
	"bytes"
	"compress/gzip"
	"io"
Matt Bell's avatar
Matt Bell committed
8
	gopath "path"
9
	"time"
10

11
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
	mdag "github.com/jbenet/go-ipfs/merkledag"
	path "github.com/jbenet/go-ipfs/path"
	uio "github.com/jbenet/go-ipfs/unixfs/io"
	upb "github.com/jbenet/go-ipfs/unixfs/pb"

	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
)

type Reader struct {
	buf        bytes.Buffer
	closed     bool
	signalChan chan struct{}
	dag        mdag.DAGService
	resolver   *path.Resolver
	writer     *tar.Writer
	gzipWriter *gzip.Writer
	err        error
}

Jeromy's avatar
Jeromy committed
31
func NewReader(path path.Path, dag mdag.DAGService, resolver *path.Resolver, compression int) (*Reader, error) {
Matt Bell's avatar
Matt Bell committed
32

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
	reader := &Reader{
		signalChan: make(chan struct{}),
		dag:        dag,
		resolver:   resolver,
	}

	var err error
	if compression != gzip.NoCompression {
		reader.gzipWriter, err = gzip.NewWriterLevel(&reader.buf, compression)
		if err != nil {
			return nil, err
		}
		reader.writer = tar.NewWriter(reader.gzipWriter)
	} else {
		reader.writer = tar.NewWriter(&reader.buf)
	}

	dagnode, err := resolver.ResolvePath(path)
	if err != nil {
		return nil, err
	}

	// writeToBuf will write the data to the buffer, and will signal when there
	// is new data to read
Jeromy's avatar
Jeromy committed
57
	_, filename := gopath.Split(path.String())
58
	go reader.writeToBuf(dagnode, filename, 0)
59 60 61 62

	return reader, nil
}

63
func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
64 65 66
	pb := new(upb.Data)
	err := proto.Unmarshal(dagnode.Data, pb)
	if err != nil {
67
		r.emitError(err)
68 69 70 71
		return
	}

	if depth == 0 {
72
		defer r.close()
73 74 75
	}

	if pb.GetType() == upb.Data_Directory {
76
		err = r.writer.WriteHeader(&tar.Header{
77 78 79
			Name:     path,
			Typeflag: tar.TypeDir,
			Mode:     0777,
80
			ModTime:  time.Now(),
81 82 83
			// TODO: set mode, dates, etc. when added to unixFS
		})
		if err != nil {
84
			r.emitError(err)
85 86
			return
		}
87
		r.flush()
88

89 90 91 92
		ctx, _ := context.WithTimeout(context.TODO(), time.Second*60)

		for i, ng := range r.dag.GetDAG(ctx, dagnode) {
			childNode, err := ng.Get()
93
			if err != nil {
94
				r.emitError(err)
95 96
				return
			}
97
			r.writeToBuf(childNode, gopath.Join(path, dagnode.Links[i].Name), depth+1)
98 99 100 101
		}
		return
	}

102
	err = r.writer.WriteHeader(&tar.Header{
103 104 105 106
		Name:     path,
		Size:     int64(pb.GetFilesize()),
		Typeflag: tar.TypeReg,
		Mode:     0644,
107
		ModTime:  time.Now(),
108 109 110
		// TODO: set mode, dates, etc. when added to unixFS
	})
	if err != nil {
111
		r.emitError(err)
112 113
		return
	}
114
	r.flush()
115

116
	reader, err := uio.NewDagReader(context.TODO(), dagnode, r.dag)
117
	if err != nil {
118
		r.emitError(err)
119 120 121
		return
	}

122
	err = r.syncCopy(reader)
123
	if err != nil {
124
		r.emitError(err)
125 126 127 128
		return
	}
}

129
func (r *Reader) Read(p []byte) (int, error) {
130 131
	// wait for the goroutine that is writing data to the buffer to tell us
	// there is something to read
132 133
	if !r.closed {
		<-r.signalChan
134 135
	}

136 137
	if r.err != nil {
		return 0, r.err
138 139
	}

140 141
	if !r.closed {
		defer r.signal()
142 143
	}

144 145
	if r.buf.Len() == 0 {
		if r.closed {
146 147 148 149 150
			return 0, io.EOF
		}
		return 0, nil
	}

151 152
	n, err := r.buf.Read(p)
	if err == io.EOF && !r.closed || r.buf.Len() > 0 {
153 154 155 156 157 158
		return n, nil
	}

	return n, err
}

159 160
func (r *Reader) signal() {
	r.signalChan <- struct{}{}
161 162
}

163 164 165
func (r *Reader) flush() {
	r.signal()
	<-r.signalChan
166 167
}

168 169 170
func (r *Reader) emitError(err error) {
	r.err = err
	r.signal()
171 172
}

173 174 175 176
func (r *Reader) close() {
	r.closed = true
	defer r.signal()
	err := r.writer.Close()
177
	if err != nil {
178
		r.emitError(err)
179 180
		return
	}
181 182
	if r.gzipWriter != nil {
		err = r.gzipWriter.Close()
183
		if err != nil {
184
			r.emitError(err)
185 186 187 188 189
			return
		}
	}
}

190
func (r *Reader) syncCopy(reader io.Reader) error {
191 192 193 194
	buf := make([]byte, 32*1024)
	for {
		nr, err := reader.Read(buf)
		if nr > 0 {
195
			_, err := r.writer.Write(buf[:nr])
196 197 198
			if err != nil {
				return err
			}
199
			r.flush()
200 201 202 203 204 205 206 207 208 209
		}
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
	}
	return nil
}