dagreader.go 6.48 KB
Newer Older
1
package io
2 3 4 5 6

import (
	"bytes"
	"errors"
	"io"
Jeromy's avatar
Jeromy committed
7
	"os"
8

9
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Jeromy's avatar
Jeromy committed
10

11
	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
12 13
	mdag "github.com/jbenet/go-ipfs/merkledag"
	ft "github.com/jbenet/go-ipfs/unixfs"
14
	ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
15 16
)

Siraj Ravel's avatar
Siraj Ravel committed
17
var ErrIsDir = errors.New("this dag node is a directory")
18 19 20

// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
Jeromy's avatar
Jeromy committed
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
	serv mdag.DAGService

	// the node being read
	node *mdag.Node

	// cached protobuf structure from node.Data
	pbdata *ftpb.Data

	// the current data buffer to be read from
	// will either be a bytes.Reader or a child DagReader
	buf ReadSeekCloser

	// NodeGetters for each of 'nodes' child links
	promises []mdag.NodeGetter

	// the index of the child link currently being read from
Jeromy's avatar
Jeromy committed
37
	linkPosition int
Jeromy's avatar
Jeromy committed
38 39 40

	// current offset for the read head within the 'file'
	offset int64
Jeromy's avatar
Jeromy committed
41 42 43 44

	// Our context
	ctx context.Context

Jeromy's avatar
Jeromy committed
45
	// context cancel for children
Jeromy's avatar
Jeromy committed
46 47 48 49 50 51 52
	cancel func()
}

type ReadSeekCloser interface {
	io.Reader
	io.Seeker
	io.Closer
53
	io.WriterTo
54 55
}

Jeromy's avatar
Jeromy committed
56 57
// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
58
func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*DagReader, error) {
59
	pb := new(ftpb.Data)
60 61 62 63
	err := proto.Unmarshal(n.Data, pb)
	if err != nil {
		return nil, err
	}
64

65
	switch pb.GetType() {
66
	case ftpb.Data_Directory:
Jeromy's avatar
Jeromy committed
67
		// Dont allow reading directories
68
		return nil, ErrIsDir
69 70
	case ftpb.Data_Raw:
		fallthrough
71
	case ftpb.Data_File:
72
		return newDataFileReader(ctx, n, pb, serv), nil
73 74 75 76 77 78 79 80 81
	case ftpb.Data_Metadata:
		if len(n.Links) == 0 {
			return nil, errors.New("incorrectly formatted metadata object")
		}
		child, err := n.Links[0].GetNode(serv)
		if err != nil {
			return nil, err
		}
		return NewDagReader(ctx, child, serv)
82
	default:
Jeromy's avatar
Jeromy committed
83
		return nil, ft.ErrUnrecognizedType
84 85 86
	}
}

87 88 89 90 91 92 93 94 95 96 97 98 99 100
func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
	fctx, cancel := context.WithCancel(ctx)
	promises := serv.GetDAG(fctx, n)
	return &DagReader{
		node:     n,
		serv:     serv,
		buf:      NewRSNCFromBytes(pb.GetData()),
		promises: promises,
		ctx:      fctx,
		cancel:   cancel,
		pbdata:   pb,
	}
}

Jeromy's avatar
Jeromy committed
101
// precalcNextBuf follows the next link in line and loads it from the DAGService,
Jeromy's avatar
Jeromy committed
102
// setting the next buffer to read from
103
func (dr *DagReader) precalcNextBuf() error {
Jeromy's avatar
Jeromy committed
104 105 106
	dr.buf.Close() // Just to make sure
	if dr.linkPosition >= len(dr.promises) {
		return io.EOF
Jeromy's avatar
Jeromy committed
107
	}
Jeromy's avatar
Jeromy committed
108 109 110
	nxt, err := dr.promises[dr.linkPosition].Get()
	if err != nil {
		return err
111
	}
Jeromy's avatar
Jeromy committed
112
	dr.linkPosition++
Jeromy's avatar
Jeromy committed
113

114
	pb := new(ftpb.Data)
Jeromy's avatar
Jeromy committed
115
	err = proto.Unmarshal(nxt.Data, pb)
116 117 118 119 120
	if err != nil {
		return err
	}

	switch pb.GetType() {
121
	case ftpb.Data_Directory:
Jeromy's avatar
Jeromy committed
122
		// A directory should not exist within a file
Jeromy's avatar
Jeromy committed
123
		return ft.ErrInvalidDirLocation
124
	case ftpb.Data_File:
125
		dr.buf = newDataFileReader(dr.ctx, nxt, pb, dr.serv)
Jeromy's avatar
Jeromy committed
126
		return nil
127
	case ftpb.Data_Raw:
Jeromy's avatar
Jeromy committed
128
		dr.buf = NewRSNCFromBytes(pb.GetData())
129
		return nil
130 131
	case ftpb.Data_Metadata:
		return errors.New("Shouldnt have had metadata object inside file")
132
	default:
Jeromy's avatar
Jeromy committed
133
		return ft.ErrUnrecognizedType
134 135 136
	}
}

137 138 139 140 141
// Size return the total length of the data from the DAG structured file.
func (dr *DagReader) Size() int64 {
	return int64(dr.pbdata.GetFilesize())
}

Jeromy's avatar
Jeromy committed
142
// Read reads data from the DAG structured file
143
func (dr *DagReader) Read(b []byte) (int, error) {
Jeromy's avatar
Jeromy committed
144
	// If no cached buffer, load one
145 146
	total := 0
	for {
Jeromy's avatar
Jeromy committed
147
		// Attempt to fill bytes from cached buffer
148 149
		n, err := dr.buf.Read(b[total:])
		total += n
Jeromy's avatar
Jeromy committed
150
		dr.offset += int64(n)
151
		if err != nil {
Jeromy's avatar
Jeromy committed
152
			// EOF is expected
153 154 155 156
			if err != io.EOF {
				return total, err
			}
		}
Jeromy's avatar
Jeromy committed
157 158

		// If weve read enough bytes, return
159 160 161
		if total == len(b) {
			return total, nil
		}
Jeromy's avatar
Jeromy committed
162 163

		// Otherwise, load up the next block
164 165 166 167 168 169
		err = dr.precalcNextBuf()
		if err != nil {
			return total, err
		}
	}
}
170

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
func (dr *DagReader) WriteTo(w io.Writer) (int64, error) {
	// If no cached buffer, load one
	total := int64(0)
	for {
		// Attempt to write bytes from cached buffer
		n, err := dr.buf.WriteTo(w)
		total += n
		dr.offset += n
		if err != nil {
			if err != io.EOF {
				return total, err
			}
		}

		// Otherwise, load up the next block
		err = dr.precalcNextBuf()
		if err != nil {
			if err == io.EOF {
				return total, nil
			}
			return total, err
		}
	}
}

Jeromy's avatar
Jeromy committed
196
func (dr *DagReader) Close() error {
Jeromy's avatar
Jeromy committed
197
	dr.cancel()
Jeromy's avatar
Jeromy committed
198 199 200
	return nil
}

Jeromy's avatar
Jeromy committed
201 202
// Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek
203 204
// TODO: check if we can do relative seeks, to reduce the amount of dagreader
// recreations that need to happen.
205 206 207
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
	switch whence {
	case os.SEEK_SET:
Jeromy's avatar
Jeromy committed
208 209 210 211
		if offset < 0 {
			return -1, errors.New("Invalid offset")
		}

Jeromy's avatar
Jeromy committed
212
		// Grab cached protobuf object (solely to make code look cleaner)
Jeromy's avatar
Jeromy committed
213
		pb := dr.pbdata
Jeromy's avatar
Jeromy committed
214 215

		// left represents the number of bytes remaining to seek to (from beginning)
Jeromy's avatar
Jeromy committed
216
		left := offset
217
		if int64(len(pb.Data)) >= offset {
Jeromy's avatar
Jeromy committed
218
			// Close current buf to close potential child dagreader
Jeromy's avatar
Jeromy committed
219
			dr.buf.Close()
Jeromy's avatar
Jeromy committed
220
			dr.buf = NewRSNCFromBytes(pb.GetData()[offset:])
Jeromy's avatar
Jeromy committed
221 222

			// start reading links from the beginning
Jeromy's avatar
Jeromy committed
223 224 225 226
			dr.linkPosition = 0
			dr.offset = offset
			return offset, nil
		} else {
Jeromy's avatar
Jeromy committed
227
			// skip past root block data
Jeromy's avatar
Jeromy committed
228 229 230
			left -= int64(len(pb.Data))
		}

Jeromy's avatar
Jeromy committed
231
		// iterate through links and find where we need to be
Jeromy's avatar
Jeromy committed
232
		for i := 0; i < len(pb.Blocksizes); i++ {
Jeromy's avatar
Jeromy committed
233
			if pb.Blocksizes[i] > uint64(left) {
Jeromy's avatar
Jeromy committed
234
				dr.linkPosition = i
235
				break
Jeromy's avatar
Jeromy committed
236 237
			} else {
				left -= int64(pb.Blocksizes[i])
238 239
			}
		}
Jeromy's avatar
Jeromy committed
240

Jeromy's avatar
Jeromy committed
241
		// start sub-block request
Jeromy's avatar
Jeromy committed
242
		err := dr.precalcNextBuf()
243 244 245
		if err != nil {
			return 0, err
		}
Jeromy's avatar
Jeromy committed
246

Jeromy's avatar
Jeromy committed
247
		// set proper offset within child readseeker
Jeromy's avatar
Jeromy committed
248
		n, err := dr.buf.Seek(left, os.SEEK_SET)
Jeromy's avatar
Jeromy committed
249 250 251
		if err != nil {
			return -1, err
		}
Jeromy's avatar
Jeromy committed
252 253

		// sanity
Jeromy's avatar
Jeromy committed
254 255 256 257 258 259
		left -= n
		if left != 0 {
			return -1, errors.New("failed to seek properly")
		}
		dr.offset = offset
		return offset, nil
260
	case os.SEEK_CUR:
Jeromy's avatar
Jeromy committed
261
		// TODO: be smarter here
Jeromy's avatar
Jeromy committed
262 263
		noffset := dr.offset + offset
		return dr.Seek(noffset, os.SEEK_SET)
264
	case os.SEEK_END:
Jeromy's avatar
Jeromy committed
265
		noffset := int64(dr.pbdata.GetFilesize()) - offset
Jeromy's avatar
Jeromy committed
266
		return dr.Seek(noffset, os.SEEK_SET)
267 268 269 270 271
	default:
		return 0, errors.New("invalid whence")
	}
	return 0, nil
}
Jeromy's avatar
Jeromy committed
272

Jeromy's avatar
Jeromy committed
273
// readSeekNopCloser wraps a bytes.Reader to implement ReadSeekCloser
Jeromy's avatar
Jeromy committed
274 275 276 277 278 279 280 281 282
type readSeekNopCloser struct {
	*bytes.Reader
}

func NewRSNCFromBytes(b []byte) ReadSeekCloser {
	return &readSeekNopCloser{bytes.NewReader(b)}
}

func (r *readSeekNopCloser) Close() error { return nil }