dagreader.go 3.57 KB
Newer Older
1 2 3 4 5 6 7
package io

import (
	"bytes"
	"errors"
	"io"

8
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Jeromy's avatar
Jeromy committed
9

10 11 12
	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
	mdag "github.com/jbenet/go-ipfs/merkledag"
	ft "github.com/jbenet/go-ipfs/unixfs"
13
	ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
14 15 16 17 18 19
)

var ErrIsDir = errors.New("this dag node is a directory")

// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
Jeromy's avatar
Jeromy committed
20 21 22 23 24
	serv         mdag.DAGService
	node         *mdag.Node
	buf          io.Reader
	fetchChan    <-chan *mdag.Node
	linkPosition int
25 26 27 28
}

// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
29
func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
30
	pb := new(ftpb.Data)
31 32 33 34 35 36
	err := proto.Unmarshal(n.Data, pb)
	if err != nil {
		return nil, err
	}

	switch pb.GetType() {
37
	case ftpb.Data_Directory:
38 39
		// Dont allow reading directories
		return nil, ErrIsDir
40
	case ftpb.Data_File:
Jeromy's avatar
Jeromy committed
41
		fetchChan := serv.GetDAG(context.TODO(), n)
Jeromy's avatar
Jeromy committed
42
		return &DagReader{
Jeromy's avatar
Jeromy committed
43 44 45
			node:      n,
			serv:      serv,
			buf:       bytes.NewBuffer(pb.GetData()),
Jeromy's avatar
Jeromy committed
46
			fetchChan: fetchChan,
Jeromy's avatar
Jeromy committed
47
		}, nil
48
	case ftpb.Data_Raw:
49 50 51 52 53 54 55
		// Raw block will just be a single level, return a byte buffer
		return bytes.NewBuffer(pb.GetData()), nil
	default:
		return nil, ft.ErrUnrecognizedType
	}
}

Jeromy's avatar
Jeromy committed
56
// precalcNextBuf follows the next link in line and loads it from the DAGService,
57 58
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error {
Jeromy's avatar
Jeromy committed
59 60
	var nxt *mdag.Node
	var ok bool
61

Jeromy's avatar
Jeromy committed
62 63 64 65 66 67 68 69 70
	if dr.fetchChan == nil {
		// This panic is appropriate because the select statement
		// will not panic if you try and read from a nil channel
		// it will simply hang.
		panic("fetchChan should NOT be nil")
	}
	select {
	case nxt, ok = <-dr.fetchChan:
		if !ok {
Jeromy's avatar
Jeromy committed
71 72
			return io.EOF
		}
73
	}
Jeromy's avatar
Jeromy committed
74

75
	pb := new(ftpb.Data)
Jeromy's avatar
Jeromy committed
76
	err := proto.Unmarshal(nxt.Data, pb)
77 78 79 80 81
	if err != nil {
		return err
	}

	switch pb.GetType() {
82
	case ftpb.Data_Directory:
Jeromy's avatar
Jeromy committed
83
		// A directory should not exist within a file
84
		return ft.ErrInvalidDirLocation
85
	case ftpb.Data_File:
86
		//TODO: this *should* work, needs testing first
Jeromy's avatar
Jeromy committed
87 88 89 90 91 92 93
		log.Warning("Running untested code for multilayered indirect FS reads.")
		subr, err := NewDagReader(nxt, dr.serv)
		if err != nil {
			return err
		}
		dr.buf = subr
		return nil
94
	case ftpb.Data_Raw:
95 96 97 98 99 100 101
		dr.buf = bytes.NewBuffer(pb.GetData())
		return nil
	default:
		return ft.ErrUnrecognizedType
	}
}

Jeromy's avatar
Jeromy committed
102
// Read reads data from the DAG structured file
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
func (dr *DagReader) Read(b []byte) (int, error) {
	// If no cached buffer, load one
	if dr.buf == nil {
		err := dr.precalcNextBuf()
		if err != nil {
			return 0, err
		}
	}
	total := 0
	for {
		// Attempt to fill bytes from cached buffer
		n, err := dr.buf.Read(b[total:])
		total += n
		if err != nil {
			// EOF is expected
			if err != io.EOF {
				return total, err
			}
		}

		// If weve read enough bytes, return
		if total == len(b) {
			return total, nil
		}

		// Otherwise, load up the next block
		err = dr.precalcNextBuf()
		if err != nil {
			return total, err
		}
	}
}

/*
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
	switch whence {
	case os.SEEK_SET:
		for i := 0; i < len(dr.node.Links); i++ {
			nsize := dr.node.Links[i].Size - 8
			if offset > nsize {
				offset -= nsize
			} else {
				break
			}
		}
		dr.position = i
		err := dr.precalcNextBuf()
		if err != nil {
			return 0, err
		}
	case os.SEEK_CUR:
	case os.SEEK_END:
	default:
		return 0, errors.New("invalid whence")
	}
	return 0, nil
}
*/