Commit dcb7a531 authored by Jeromy's avatar Jeromy

teach unixfs/tar (aka ipfs get) how to use GetBlocks

parent 7e7fb827
......@@ -60,20 +60,20 @@ func NewReader(path path.Path, dag mdag.DAGService, resolver *path.Resolver, com
return reader, nil
}
func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
pb := new(upb.Data)
err := proto.Unmarshal(dagnode.Data, pb)
if err != nil {
i.emitError(err)
r.emitError(err)
return
}
if depth == 0 {
defer i.close()
defer r.close()
}
if pb.GetType() == upb.Data_Directory {
err = i.writer.WriteHeader(&tar.Header{
err = r.writer.WriteHeader(&tar.Header{
Name: path,
Typeflag: tar.TypeDir,
Mode: 0777,
......@@ -81,23 +81,25 @@ func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
// TODO: set mode, dates, etc. when added to unixFS
})
if err != nil {
i.emitError(err)
r.emitError(err)
return
}
i.flush()
r.flush()
for _, link := range dagnode.Links {
childNode, err := link.GetNode(i.dag)
ctx, _ := context.WithTimeout(context.TODO(), time.Second*60)
for i, ng := range r.dag.GetDAG(ctx, dagnode) {
childNode, err := ng.Get()
if err != nil {
i.emitError(err)
r.emitError(err)
return
}
i.writeToBuf(childNode, gopath.Join(path, link.Name), depth+1)
r.writeToBuf(childNode, gopath.Join(path, dagnode.Links[i].Name), depth+1)
}
return
}
err = i.writer.WriteHeader(&tar.Header{
err = r.writer.WriteHeader(&tar.Header{
Name: path,
Size: int64(pb.GetFilesize()),
Typeflag: tar.TypeReg,
......@@ -106,95 +108,95 @@ func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
// TODO: set mode, dates, etc. when added to unixFS
})
if err != nil {
i.emitError(err)
r.emitError(err)
return
}
i.flush()
r.flush()
reader, err := uio.NewDagReader(context.TODO(), dagnode, i.dag)
reader, err := uio.NewDagReader(context.TODO(), dagnode, r.dag)
if err != nil {
i.emitError(err)
r.emitError(err)
return
}
err = i.syncCopy(reader)
err = r.syncCopy(reader)
if err != nil {
i.emitError(err)
r.emitError(err)
return
}
}
func (i *Reader) Read(p []byte) (int, error) {
func (r *Reader) Read(p []byte) (int, error) {
// wait for the goroutine that is writing data to the buffer to tell us
// there is something to read
if !i.closed {
<-i.signalChan
if !r.closed {
<-r.signalChan
}
if i.err != nil {
return 0, i.err
if r.err != nil {
return 0, r.err
}
if !i.closed {
defer i.signal()
if !r.closed {
defer r.signal()
}
if i.buf.Len() == 0 {
if i.closed {
if r.buf.Len() == 0 {
if r.closed {
return 0, io.EOF
}
return 0, nil
}
n, err := i.buf.Read(p)
if err == io.EOF && !i.closed || i.buf.Len() > 0 {
n, err := r.buf.Read(p)
if err == io.EOF && !r.closed || r.buf.Len() > 0 {
return n, nil
}
return n, err
}
func (i *Reader) signal() {
i.signalChan <- struct{}{}
func (r *Reader) signal() {
r.signalChan <- struct{}{}
}
func (i *Reader) flush() {
i.signal()
<-i.signalChan
func (r *Reader) flush() {
r.signal()
<-r.signalChan
}
func (i *Reader) emitError(err error) {
i.err = err
i.signal()
func (r *Reader) emitError(err error) {
r.err = err
r.signal()
}
func (i *Reader) close() {
i.closed = true
defer i.signal()
err := i.writer.Close()
func (r *Reader) close() {
r.closed = true
defer r.signal()
err := r.writer.Close()
if err != nil {
i.emitError(err)
r.emitError(err)
return
}
if i.gzipWriter != nil {
err = i.gzipWriter.Close()
if r.gzipWriter != nil {
err = r.gzipWriter.Close()
if err != nil {
i.emitError(err)
r.emitError(err)
return
}
}
}
func (i *Reader) syncCopy(reader io.Reader) error {
func (r *Reader) syncCopy(reader io.Reader) error {
buf := make([]byte, 32*1024)
for {
nr, err := reader.Read(buf)
if nr > 0 {
_, err := i.writer.Write(buf[:nr])
_, err := r.writer.Write(buf[:nr])
if err != nil {
return err
}
i.flush()
r.flush()
}
if err == io.EOF {
break
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment