dagreader.go 3.19 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
package io

import (
	"bytes"
	"errors"
	"io"

	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
	mdag "github.com/jbenet/go-ipfs/merkledag"
	ft "github.com/jbenet/go-ipfs/unixfs"
11
	ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
12 13 14 15 16 17
)

var ErrIsDir = errors.New("this dag node is a directory")

// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
18
	serv     mdag.DAGService
19 20
	node     *mdag.Node
	position int
Jeromy's avatar
Jeromy committed
21
	buf      io.Reader
22 23 24 25
}

// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
26
func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
27
	pb := new(ftpb.Data)
28 29 30 31 32 33
	err := proto.Unmarshal(n.Data, pb)
	if err != nil {
		return nil, err
	}

	switch pb.GetType() {
34
	case ftpb.Data_Directory:
35 36
		// Dont allow reading directories
		return nil, ErrIsDir
37
	case ftpb.Data_File:
Jeromy's avatar
Jeromy committed
38
		return &DagReader{
39 40 41
			node: n,
			serv: serv,
			buf:  bytes.NewBuffer(pb.GetData()),
Jeromy's avatar
Jeromy committed
42
		}, nil
43
	case ftpb.Data_Raw:
44 45 46 47 48 49 50
		// Raw block will just be a single level, return a byte buffer
		return bytes.NewBuffer(pb.GetData()), nil
	default:
		return nil, ft.ErrUnrecognizedType
	}
}

Jeromy's avatar
Jeromy committed
51
// precalcNextBuf follows the next link in line and loads it from the DAGService,
52 53 54 55 56
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error {
	if dr.position >= len(dr.node.Links) {
		return io.EOF
	}
Jeromy's avatar
Jeromy committed
57 58 59
	nxt, err := dr.node.Links[dr.position].GetNode(dr.serv)
	if err != nil {
		return err
60
	}
61
	pb := new(ftpb.Data)
Jeromy's avatar
Jeromy committed
62
	err = proto.Unmarshal(nxt.Data, pb)
63 64 65 66 67 68
	if err != nil {
		return err
	}
	dr.position++

	switch pb.GetType() {
69
	case ftpb.Data_Directory:
Jeromy's avatar
Jeromy committed
70
		// A directory should not exist within a file
71
		return ft.ErrInvalidDirLocation
72
	case ftpb.Data_File:
73
		//TODO: this *should* work, needs testing first
Jeromy's avatar
Jeromy committed
74 75 76 77 78 79 80
		log.Warning("Running untested code for multilayered indirect FS reads.")
		subr, err := NewDagReader(nxt, dr.serv)
		if err != nil {
			return err
		}
		dr.buf = subr
		return nil
81
	case ftpb.Data_Raw:
82 83 84 85 86 87 88
		dr.buf = bytes.NewBuffer(pb.GetData())
		return nil
	default:
		return ft.ErrUnrecognizedType
	}
}

Jeromy's avatar
Jeromy committed
89
// Read reads data from the DAG structured file
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
func (dr *DagReader) Read(b []byte) (int, error) {
	// If no cached buffer, load one
	if dr.buf == nil {
		err := dr.precalcNextBuf()
		if err != nil {
			return 0, err
		}
	}
	total := 0
	for {
		// Attempt to fill bytes from cached buffer
		n, err := dr.buf.Read(b[total:])
		total += n
		if err != nil {
			// EOF is expected
			if err != io.EOF {
				return total, err
			}
		}

		// If weve read enough bytes, return
		if total == len(b) {
			return total, nil
		}

		// Otherwise, load up the next block
		err = dr.precalcNextBuf()
		if err != nil {
			return total, err
		}
	}
}

/*
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
	switch whence {
	case os.SEEK_SET:
		for i := 0; i < len(dr.node.Links); i++ {
			nsize := dr.node.Links[i].Size - 8
			if offset > nsize {
				offset -= nsize
			} else {
				break
			}
		}
		dr.position = i
		err := dr.precalcNextBuf()
		if err != nil {
			return 0, err
		}
	case os.SEEK_CUR:
	case os.SEEK_END:
	default:
		return 0, errors.New("invalid whence")
	}
	return 0, nil
}
*/