dagreader.go 3.13 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
package io

import (
	"bytes"
	"errors"
	"io"

	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
	mdag "github.com/jbenet/go-ipfs/merkledag"
	ft "github.com/jbenet/go-ipfs/unixfs"
11
	ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
12 13 14 15 16 17 18
	u "github.com/jbenet/go-ipfs/util"
)

var ErrIsDir = errors.New("this dag node is a directory")

// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
19
	serv     mdag.DAGService
20 21 22 23 24 25 26
	node     *mdag.Node
	position int
	buf      *bytes.Buffer
}

// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
27
func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
28
	pb := new(ftpb.Data)
29 30 31 32 33 34
	err := proto.Unmarshal(n.Data, pb)
	if err != nil {
		return nil, err
	}

	switch pb.GetType() {
35
	case ftpb.Data_Directory:
36 37
		// Dont allow reading directories
		return nil, ErrIsDir
38
	case ftpb.Data_File:
39 40 41 42 43
		return &DagReader{
			node: n,
			serv: serv,
			buf:  bytes.NewBuffer(pb.GetData()),
		}, nil
44
	case ftpb.Data_Raw:
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
		// Raw block will just be a single level, return a byte buffer
		return bytes.NewBuffer(pb.GetData()), nil
	default:
		return nil, ft.ErrUnrecognizedType
	}
}

// Follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error {
	if dr.position >= len(dr.node.Links) {
		return io.EOF
	}
	nxtLink := dr.node.Links[dr.position]
	nxt := nxtLink.Node
	if nxt == nil {
		nxtNode, err := dr.serv.Get(u.Key(nxtLink.Hash))
		if err != nil {
			return err
		}
		nxt = nxtNode
	}
67
	pb := new(ftpb.Data)
68 69 70 71 72 73 74
	err := proto.Unmarshal(nxt.Data, pb)
	if err != nil {
		return err
	}
	dr.position++

	switch pb.GetType() {
75
	case ftpb.Data_Directory:
76
		return ft.ErrInvalidDirLocation
77
	case ftpb.Data_File:
78 79 80
		//TODO: this *should* work, needs testing first
		//return NewDagReader(nxt, dr.serv)
		panic("Not yet handling different layers of indirection!")
81
	case ftpb.Data_Raw:
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
		dr.buf = bytes.NewBuffer(pb.GetData())
		return nil
	default:
		return ft.ErrUnrecognizedType
	}
}

func (dr *DagReader) Read(b []byte) (int, error) {
	// If no cached buffer, load one
	if dr.buf == nil {
		err := dr.precalcNextBuf()
		if err != nil {
			return 0, err
		}
	}
	total := 0
	for {
		// Attempt to fill bytes from cached buffer
		n, err := dr.buf.Read(b[total:])
		total += n
		if err != nil {
			// EOF is expected
			if err != io.EOF {
				return total, err
			}
		}

		// If weve read enough bytes, return
		if total == len(b) {
			return total, nil
		}

		// Otherwise, load up the next block
		err = dr.precalcNextBuf()
		if err != nil {
			return total, err
		}
	}
}

/*
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
	switch whence {
	case os.SEEK_SET:
		for i := 0; i < len(dr.node.Links); i++ {
			nsize := dr.node.Links[i].Size - 8
			if offset > nsize {
				offset -= nsize
			} else {
				break
			}
		}
		dr.position = i
		err := dr.precalcNextBuf()
		if err != nil {
			return 0, err
		}
	case os.SEEK_CUR:
	case os.SEEK_END:
	default:
		return 0, errors.New("invalid whence")
	}
	return 0, nil
}
*/