dagreader.go 7.52 KB
Newer Older
1 2 3 4
package io

import (
	"bytes"
5
	"context"
6
	"errors"
7
	"fmt"
8
	"io"
Jeromy's avatar
Jeromy committed
9
	"os"
10

11 12 13
	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	ftpb "github.com/ipfs/go-ipfs/unixfs/pb"
14

15
	node "gx/ipfs/QmUsVJ7AEnGyjX8YWnrwq9vmECVGwBQNAKPpgz5KSg8dcq/go-ipld-node"
16
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
17 18 19 20
)

var ErrIsDir = errors.New("this dag node is a directory")

21 22
var ErrCantReadSymlinks = errors.New("cannot currently read symlinks")

23 24
// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
Jeromy's avatar
Jeromy committed
25 26 27
	serv mdag.DAGService

	// the node being read
28
	node *mdag.ProtoNode
Jeromy's avatar
Jeromy committed
29 30 31 32 33 34 35 36 37 38 39 40

	// cached protobuf structure from node.Data
	pbdata *ftpb.Data

	// the current data buffer to be read from
	// will either be a bytes.Reader or a child DagReader
	buf ReadSeekCloser

	// NodeGetters for each of 'nodes' child links
	promises []mdag.NodeGetter

	// the index of the child link currently being read from
Jeromy's avatar
Jeromy committed
41
	linkPosition int
Jeromy's avatar
Jeromy committed
42 43 44

	// current offset for the read head within the 'file'
	offset int64
Jeromy's avatar
Jeromy committed
45 46 47 48

	// Our context
	ctx context.Context

Jeromy's avatar
Jeromy committed
49
	// context cancel for children
Jeromy's avatar
Jeromy committed
50 51 52 53 54 55 56
	cancel func()
}

type ReadSeekCloser interface {
	io.Reader
	io.Seeker
	io.Closer
57
	io.WriterTo
58 59
}

60 61
// NewDagReader creates a new reader object that reads the data represented by
// the given node, using the passed in DAGService for data retreival
62 63 64 65 66 67 68 69 70
func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (*DagReader, error) {
	switch n := n.(type) {
	case *mdag.RawNode:
		return &DagReader{
			buf: NewRSNCFromBytes(n.RawData()),
		}, nil
	case *mdag.ProtoNode:
		pb := new(ftpb.Data)
		if err := proto.Unmarshal(n.Data(), pb); err != nil {
71 72
			return nil, err
		}
73

74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
		switch pb.GetType() {
		case ftpb.Data_Directory:
			// Dont allow reading directories
			return nil, ErrIsDir
		case ftpb.Data_File, ftpb.Data_Raw:
			return NewDataFileReader(ctx, n, pb, serv), nil
		case ftpb.Data_Metadata:
			if len(n.Links()) == 0 {
				return nil, errors.New("incorrectly formatted metadata object")
			}
			child, err := n.Links()[0].GetNode(ctx, serv)
			if err != nil {
				return nil, err
			}

			childpb, ok := child.(*mdag.ProtoNode)
			if !ok {
				return nil, mdag.ErrNotProtobuf
			}
			return NewDagReader(ctx, childpb, serv)
		case ftpb.Data_Symlink:
			return nil, ErrCantReadSymlinks
		default:
			return nil, ft.ErrUnrecognizedType
98
		}
99
	default:
100
		return nil, fmt.Errorf("unrecognized node type")
101 102 103
	}
}

104
func NewDataFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
105
	fctx, cancel := context.WithCancel(ctx)
106
	promises := mdag.GetDAG(fctx, serv, n)
107 108 109 110 111 112 113 114 115 116 117
	return &DagReader{
		node:     n,
		serv:     serv,
		buf:      NewRSNCFromBytes(pb.GetData()),
		promises: promises,
		ctx:      fctx,
		cancel:   cancel,
		pbdata:   pb,
	}
}

118 119
// precalcNextBuf follows the next link in line and loads it from the
// DAGService, setting the next buffer to read from
120
func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
Jeromy's avatar
Jeromy committed
121 122 123
	dr.buf.Close() // Just to make sure
	if dr.linkPosition >= len(dr.promises) {
		return io.EOF
Jeromy's avatar
Jeromy committed
124
	}
125 126

	nxt, err := dr.promises[dr.linkPosition].Get(ctx)
Jeromy's avatar
Jeromy committed
127 128
	if err != nil {
		return err
129
	}
Jeromy's avatar
Jeromy committed
130
	dr.linkPosition++
Jeromy's avatar
Jeromy committed
131

132 133 134 135 136 137 138
	switch nxt := nxt.(type) {
	case *mdag.ProtoNode:
		pb := new(ftpb.Data)
		err = proto.Unmarshal(nxt.Data(), pb)
		if err != nil {
			return fmt.Errorf("incorrectly formatted protobuf: %s", err)
		}
139

140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
		switch pb.GetType() {
		case ftpb.Data_Directory:
			// A directory should not exist within a file
			return ft.ErrInvalidDirLocation
		case ftpb.Data_File:
			dr.buf = NewDataFileReader(dr.ctx, nxt, pb, dr.serv)
			return nil
		case ftpb.Data_Raw:
			dr.buf = NewRSNCFromBytes(pb.GetData())
			return nil
		case ftpb.Data_Metadata:
			return errors.New("shouldnt have had metadata object inside file")
		case ftpb.Data_Symlink:
			return errors.New("shouldnt have had symlink inside file")
		default:
			return ft.ErrUnrecognizedType
		}
	case *mdag.RawNode:
		dr.buf = NewRSNCFromBytes(nxt.RawData())
159 160
		return nil
	default:
161
		return errors.New("unrecognized node type in DagReader")
162 163 164
	}
}

165
// Size return the total length of the data from the DAG structured file.
rht's avatar
rht committed
166 167
func (dr *DagReader) Size() uint64 {
	return dr.pbdata.GetFilesize()
168 169
}

Jeromy's avatar
Jeromy committed
170
// Read reads data from the DAG structured file
171
func (dr *DagReader) Read(b []byte) (int, error) {
172 173 174 175 176
	return dr.CtxReadFull(dr.ctx, b)
}

// CtxReadFull reads data from the DAG structured file
func (dr *DagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
177 178 179 180 181 182
	// If no cached buffer, load one
	total := 0
	for {
		// Attempt to fill bytes from cached buffer
		n, err := dr.buf.Read(b[total:])
		total += n
Jeromy's avatar
Jeromy committed
183
		dr.offset += int64(n)
184 185 186 187 188 189 190 191 192 193 194 195 196
		if err != nil {
			// EOF is expected
			if err != io.EOF {
				return total, err
			}
		}

		// If weve read enough bytes, return
		if total == len(b) {
			return total, nil
		}

		// Otherwise, load up the next block
197
		err = dr.precalcNextBuf(ctx)
198 199 200 201 202 203
		if err != nil {
			return total, err
		}
	}
}

204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
func (dr *DagReader) WriteTo(w io.Writer) (int64, error) {
	// If no cached buffer, load one
	total := int64(0)
	for {
		// Attempt to write bytes from cached buffer
		n, err := dr.buf.WriteTo(w)
		total += n
		dr.offset += n
		if err != nil {
			if err != io.EOF {
				return total, err
			}
		}

		// Otherwise, load up the next block
219
		err = dr.precalcNextBuf(dr.ctx)
220 221 222 223 224 225 226 227 228
		if err != nil {
			if err == io.EOF {
				return total, nil
			}
			return total, err
		}
	}
}

Jeromy's avatar
Jeromy committed
229
func (dr *DagReader) Close() error {
Jeromy's avatar
Jeromy committed
230
	dr.cancel()
Jeromy's avatar
Jeromy committed
231 232 233
	return nil
}

Jakub Sztandera's avatar
Jakub Sztandera committed
234 235 236 237
func (dr *DagReader) Offset() int64 {
	return dr.offset
}

Jeromy's avatar
Jeromy committed
238 239
// Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek
240 241
// TODO: check if we can do relative seeks, to reduce the amount of dagreader
// recreations that need to happen.
242 243 244
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
	switch whence {
	case os.SEEK_SET:
Jeromy's avatar
Jeromy committed
245 246 247 248
		if offset < 0 {
			return -1, errors.New("Invalid offset")
		}

Jeromy's avatar
Jeromy committed
249
		// Grab cached protobuf object (solely to make code look cleaner)
Jeromy's avatar
Jeromy committed
250
		pb := dr.pbdata
Jeromy's avatar
Jeromy committed
251 252

		// left represents the number of bytes remaining to seek to (from beginning)
Jeromy's avatar
Jeromy committed
253
		left := offset
254
		if int64(len(pb.Data)) >= offset {
Jeromy's avatar
Jeromy committed
255
			// Close current buf to close potential child dagreader
Jeromy's avatar
Jeromy committed
256
			dr.buf.Close()
Jeromy's avatar
Jeromy committed
257
			dr.buf = NewRSNCFromBytes(pb.GetData()[offset:])
Jeromy's avatar
Jeromy committed
258 259

			// start reading links from the beginning
Jeromy's avatar
Jeromy committed
260 261 262 263
			dr.linkPosition = 0
			dr.offset = offset
			return offset, nil
		} else {
Jeromy's avatar
Jeromy committed
264
			// skip past root block data
Jeromy's avatar
Jeromy committed
265 266 267
			left -= int64(len(pb.Data))
		}

Jeromy's avatar
Jeromy committed
268
		// iterate through links and find where we need to be
Jeromy's avatar
Jeromy committed
269
		for i := 0; i < len(pb.Blocksizes); i++ {
Jeromy's avatar
Jeromy committed
270
			if pb.Blocksizes[i] > uint64(left) {
Jeromy's avatar
Jeromy committed
271
				dr.linkPosition = i
272
				break
Jeromy's avatar
Jeromy committed
273 274
			} else {
				left -= int64(pb.Blocksizes[i])
275 276
			}
		}
Jeromy's avatar
Jeromy committed
277

Jeromy's avatar
Jeromy committed
278
		// start sub-block request
279
		err := dr.precalcNextBuf(dr.ctx)
280 281 282
		if err != nil {
			return 0, err
		}
Jeromy's avatar
Jeromy committed
283

Jeromy's avatar
Jeromy committed
284
		// set proper offset within child readseeker
Jeromy's avatar
Jeromy committed
285
		n, err := dr.buf.Seek(left, os.SEEK_SET)
Jeromy's avatar
Jeromy committed
286 287 288
		if err != nil {
			return -1, err
		}
Jeromy's avatar
Jeromy committed
289 290

		// sanity
Jeromy's avatar
Jeromy committed
291 292 293 294 295 296
		left -= n
		if left != 0 {
			return -1, errors.New("failed to seek properly")
		}
		dr.offset = offset
		return offset, nil
297
	case os.SEEK_CUR:
Jeromy's avatar
Jeromy committed
298
		// TODO: be smarter here
Jeromy's avatar
Jeromy committed
299 300
		noffset := dr.offset + offset
		return dr.Seek(noffset, os.SEEK_SET)
301
	case os.SEEK_END:
Jeromy's avatar
Jeromy committed
302
		noffset := int64(dr.pbdata.GetFilesize()) - offset
Jeromy's avatar
Jeromy committed
303
		return dr.Seek(noffset, os.SEEK_SET)
304 305 306 307
	default:
		return 0, errors.New("invalid whence")
	}
}
Jeromy's avatar
Jeromy committed
308

Jeromy's avatar
Jeromy committed
309
// readSeekNopCloser wraps a bytes.Reader to implement ReadSeekCloser
Jeromy's avatar
Jeromy committed
310 311 312 313 314 315 316 317 318
type readSeekNopCloser struct {
	*bytes.Reader
}

func NewRSNCFromBytes(b []byte) ReadSeekCloser {
	return &readSeekNopCloser{bytes.NewReader(b)}
}

func (r *readSeekNopCloser) Close() error { return nil }