Commit f7fa0f60 authored by Jeromy's avatar Jeromy

Rework package structure for unixfs and subpackage

cc @jbenet
parent 7744373c
package merkledag
import (
"bytes"
"errors"
"io"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ft "github.com/jbenet/go-ipfs/importer/format"
u "github.com/jbenet/go-ipfs/util"
)
var ErrIsDir = errors.New("this dag node is a directory")
// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
serv *DAGService
node *Node
position int
buf *bytes.Buffer
}
// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
func NewDagReader(n *Node, serv *DAGService) (io.Reader, error) {
pb := new(ft.PBData)
err := proto.Unmarshal(n.Data, pb)
if err != nil {
return nil, err
}
switch pb.GetType() {
case ft.PBData_Directory:
// Dont allow reading directories
return nil, ErrIsDir
case ft.PBData_File:
return &DagReader{
node: n,
serv: serv,
buf: bytes.NewBuffer(pb.GetData()),
}, nil
case ft.PBData_Raw:
// Raw block will just be a single level, return a byte buffer
return bytes.NewBuffer(pb.GetData()), nil
default:
return nil, ft.ErrUnrecognizedType
}
}
// Follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error {
if dr.position >= len(dr.node.Links) {
return io.EOF
}
nxtLink := dr.node.Links[dr.position]
nxt := nxtLink.Node
if nxt == nil {
nxtNode, err := dr.serv.Get(u.Key(nxtLink.Hash))
if err != nil {
return err
}
nxt = nxtNode
}
pb := new(ft.PBData)
err := proto.Unmarshal(nxt.Data, pb)
if err != nil {
return err
}
dr.position++
switch pb.GetType() {
case ft.PBData_Directory:
return ft.ErrInvalidDirLocation
case ft.PBData_File:
//TODO: this *should* work, needs testing first
//return NewDagReader(nxt, dr.serv)
panic("Not yet handling different layers of indirection!")
case ft.PBData_Raw:
dr.buf = bytes.NewBuffer(pb.GetData())
return nil
default:
return ft.ErrUnrecognizedType
}
}
func (dr *DagReader) Read(b []byte) (int, error) {
// If no cached buffer, load one
if dr.buf == nil {
err := dr.precalcNextBuf()
if err != nil {
return 0, err
}
}
total := 0
for {
// Attempt to fill bytes from cached buffer
n, err := dr.buf.Read(b[total:])
total += n
if err != nil {
// EOF is expected
if err != io.EOF {
return total, err
}
}
// If weve read enough bytes, return
if total == len(b) {
return total, nil
}
// Otherwise, load up the next block
err = dr.precalcNextBuf()
if err != nil {
return total, err
}
}
}
/*
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case os.SEEK_SET:
for i := 0; i < len(dr.node.Links); i++ {
nsize := dr.node.Links[i].Size - 8
if offset > nsize {
offset -= nsize
} else {
break
}
}
dr.position = i
err := dr.precalcNextBuf()
if err != nil {
return 0, err
}
case os.SEEK_CUR:
case os.SEEK_END:
default:
return 0, errors.New("invalid whence")
}
return 0, nil
}
*/
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment