dagreader.go 3.89 KB
Newer Older
1 2 3 4 5 6 7
package io

import (
	"bytes"
	"errors"
	"io"

Jeromy's avatar
Jeromy committed
8 9
	"code.google.com/p/go.net/context"

10 11 12
	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
	mdag "github.com/jbenet/go-ipfs/merkledag"
	ft "github.com/jbenet/go-ipfs/unixfs"
13
	ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
14 15 16 17 18 19
)

var ErrIsDir = errors.New("this dag node is a directory")

// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
Jeromy's avatar
Jeromy committed
20 21 22 23 24
	serv         mdag.DAGService
	node         *mdag.Node
	buf          io.Reader
	fetchChan    <-chan *mdag.Node
	linkPosition int
25 26 27 28
}

// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
29
func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
30
	pb := new(ftpb.Data)
31 32 33 34 35 36
	err := proto.Unmarshal(n.Data, pb)
	if err != nil {
		return nil, err
	}

	switch pb.GetType() {
37
	case ftpb.Data_Directory:
38 39
		// Dont allow reading directories
		return nil, ErrIsDir
40
	case ftpb.Data_File:
Jeromy's avatar
Jeromy committed
41 42 43 44
		var fetchChan <-chan *mdag.Node
		if serv != nil {
			fetchChan = serv.BatchFetch(context.TODO(), n)
		}
Jeromy's avatar
Jeromy committed
45
		return &DagReader{
Jeromy's avatar
Jeromy committed
46 47 48
			node:      n,
			serv:      serv,
			buf:       bytes.NewBuffer(pb.GetData()),
Jeromy's avatar
Jeromy committed
49
			fetchChan: fetchChan,
Jeromy's avatar
Jeromy committed
50
		}, nil
51
	case ftpb.Data_Raw:
52 53 54 55 56 57 58
		// Raw block will just be a single level, return a byte buffer
		return bytes.NewBuffer(pb.GetData()), nil
	default:
		return nil, ft.ErrUnrecognizedType
	}
}

Jeromy's avatar
Jeromy committed
59
// precalcNextBuf follows the next link in line and loads it from the DAGService,
60 61
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error {
Jeromy's avatar
Jeromy committed
62 63
	var nxt *mdag.Node
	var ok bool
64 65

	if dr.serv == nil {
Jeromy's avatar
Jeromy committed
66 67 68 69 70 71 72 73 74
		// Only used when fetchChan is nil,
		// which only happens when passed in a nil dagservice
		// TODO: this logic is hard to follow, do it better.
		// NOTE: the only time this code is used, is during the
		//			importer tests, consider just changing those tests
		if dr.linkPosition >= len(dr.node.Links) {
			return io.EOF
		}
		nxt = dr.node.Links[dr.linkPosition].Node
75 76 77
		if nxt == nil {
			return errors.New("Got nil node back from link! and no DAGService!")
		}
Jeromy's avatar
Jeromy committed
78
		dr.linkPosition++
79 80 81 82 83 84 85 86

	} else {
		select {
		case nxt, ok = <-dr.fetchChan:
			if !ok {
				return io.EOF
			}
		}
87
	}
Jeromy's avatar
Jeromy committed
88

89
	pb := new(ftpb.Data)
Jeromy's avatar
Jeromy committed
90
	err := proto.Unmarshal(nxt.Data, pb)
91 92 93 94 95
	if err != nil {
		return err
	}

	switch pb.GetType() {
96
	case ftpb.Data_Directory:
Jeromy's avatar
Jeromy committed
97
		// A directory should not exist within a file
98
		return ft.ErrInvalidDirLocation
99
	case ftpb.Data_File:
100
		//TODO: this *should* work, needs testing first
Jeromy's avatar
Jeromy committed
101 102 103 104 105 106 107
		log.Warning("Running untested code for multilayered indirect FS reads.")
		subr, err := NewDagReader(nxt, dr.serv)
		if err != nil {
			return err
		}
		dr.buf = subr
		return nil
108
	case ftpb.Data_Raw:
109 110 111 112 113 114 115
		dr.buf = bytes.NewBuffer(pb.GetData())
		return nil
	default:
		return ft.ErrUnrecognizedType
	}
}

Jeromy's avatar
Jeromy committed
116
// Read reads data from the DAG structured file
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
func (dr *DagReader) Read(b []byte) (int, error) {
	// If no cached buffer, load one
	if dr.buf == nil {
		err := dr.precalcNextBuf()
		if err != nil {
			return 0, err
		}
	}
	total := 0
	for {
		// Attempt to fill bytes from cached buffer
		n, err := dr.buf.Read(b[total:])
		total += n
		if err != nil {
			// EOF is expected
			if err != io.EOF {
				return total, err
			}
		}

		// If weve read enough bytes, return
		if total == len(b) {
			return total, nil
		}

		// Otherwise, load up the next block
		err = dr.precalcNextBuf()
		if err != nil {
			return total, err
		}
	}
}

/*
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
	switch whence {
	case os.SEEK_SET:
		for i := 0; i < len(dr.node.Links); i++ {
			nsize := dr.node.Links[i].Size - 8
			if offset > nsize {
				offset -= nsize
			} else {
				break
			}
		}
		dr.position = i
		err := dr.precalcNextBuf()
		if err != nil {
			return 0, err
		}
	case os.SEEK_CUR:
	case os.SEEK_END:
	default:
		return 0, errors.New("invalid whence")
	}
	return 0, nil
}
*/