pbdagreader.go 7.42 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
package io

import (
	"context"
	"errors"
	"fmt"
	"io"

	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	ftpb "github.com/ipfs/go-ipfs/unixfs/pb"

	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
Steven Allen's avatar
Steven Allen committed
14
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
15
	ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
16 17
)

Hector Sanjuan's avatar
Hector Sanjuan committed
18 19
// PBDagReader provides a way to easily read the data contained in a dag.
type PBDagReader struct {
Jeromy's avatar
Jeromy committed
20
	serv ipld.NodeGetter
21 22 23 24 25 26 27 28 29 30 31

	// the node being read
	node *mdag.ProtoNode

	// cached protobuf structure from node.Data
	pbdata *ftpb.Data

	// the current data buffer to be read from
	// will either be a bytes.Reader or a child DagReader
	buf ReadSeekCloser

32
	// NodePromises for each of 'nodes' child links
33
	promises []*ipld.NodePromise
34

35 36 37
	// the cid of each child of the current node
	links []*cid.Cid

38 39 40 41 42 43 44 45 46 47 48 49 50
	// the index of the child link currently being read from
	linkPosition int

	// current offset for the read head within the 'file'
	offset int64

	// Our context
	ctx context.Context

	// context cancel for children
	cancel func()
}

Hector Sanjuan's avatar
Hector Sanjuan committed
51
var _ DagReader = (*PBDagReader)(nil)
Jakub Sztandera's avatar
Jakub Sztandera committed
52

Steven Allen's avatar
Steven Allen committed
53
// NewPBFileReader constructs a new PBFileReader.
Hector Sanjuan's avatar
Hector Sanjuan committed
54
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ipld.NodeGetter) *PBDagReader {
55
	fctx, cancel := context.WithCancel(ctx)
56
	curLinks := getLinkCids(n)
Hector Sanjuan's avatar
Hector Sanjuan committed
57
	return &PBDagReader{
58 59
		node:     n,
		serv:     serv,
Hector Sanjuan's avatar
Hector Sanjuan committed
60
		buf:      newBufDagReader(pb.GetData()),
61
		promises: make([]*ipld.NodePromise, len(curLinks)),
62
		links:    curLinks,
63 64 65 66 67 68
		ctx:      fctx,
		cancel:   cancel,
		pbdata:   pb,
	}
}

69 70
const preloadSize = 10

Hector Sanjuan's avatar
Hector Sanjuan committed
71
func (dr *PBDagReader) preloadNextNodes(ctx context.Context) {
72 73 74 75 76 77
	beg := dr.linkPosition
	end := beg + preloadSize
	if end >= len(dr.links) {
		end = len(dr.links)
	}

78
	for i, p := range ipld.GetNodes(ctx, dr.serv, dr.links[beg:end]) {
79 80 81 82
		dr.promises[beg+i] = p
	}
}

83 84
// precalcNextBuf follows the next link in line and loads it from the
// DAGService, setting the next buffer to read from
Hector Sanjuan's avatar
Hector Sanjuan committed
85
func (dr *PBDagReader) precalcNextBuf(ctx context.Context) error {
86 87 88 89 90
	if dr.buf != nil {
		dr.buf.Close() // Just to make sure
		dr.buf = nil
	}

91 92 93 94
	if dr.linkPosition >= len(dr.promises) {
		return io.EOF
	}

95 96 97 98
	if dr.promises[dr.linkPosition] == nil {
		dr.preloadNextNodes(ctx)
	}

99 100 101 102
	nxt, err := dr.promises[dr.linkPosition].Get(ctx)
	if err != nil {
		return err
	}
103
	dr.promises[dr.linkPosition] = nil
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
	dr.linkPosition++

	switch nxt := nxt.(type) {
	case *mdag.ProtoNode:
		pb := new(ftpb.Data)
		err = proto.Unmarshal(nxt.Data(), pb)
		if err != nil {
			return fmt.Errorf("incorrectly formatted protobuf: %s", err)
		}

		switch pb.GetType() {
		case ftpb.Data_Directory:
			// A directory should not exist within a file
			return ft.ErrInvalidDirLocation
		case ftpb.Data_File:
			dr.buf = NewPBFileReader(dr.ctx, nxt, pb, dr.serv)
			return nil
		case ftpb.Data_Raw:
Hector Sanjuan's avatar
Hector Sanjuan committed
122
			dr.buf = newBufDagReader(pb.GetData())
123 124 125 126 127 128 129 130 131
			return nil
		case ftpb.Data_Metadata:
			return errors.New("shouldnt have had metadata object inside file")
		case ftpb.Data_Symlink:
			return errors.New("shouldnt have had symlink inside file")
		default:
			return ft.ErrUnrecognizedType
		}
	default:
132 133 134
		var err error
		dr.buf, err = NewDagReader(ctx, nxt, dr.serv)
		return err
135 136 137
	}
}

138
func getLinkCids(n ipld.Node) []*cid.Cid {
139 140 141 142 143 144 145 146
	links := n.Links()
	out := make([]*cid.Cid, 0, len(links))
	for _, l := range links {
		out = append(out, l.Cid)
	}
	return out
}

147
// Size return the total length of the data from the DAG structured file.
Hector Sanjuan's avatar
Hector Sanjuan committed
148
func (dr *PBDagReader) Size() uint64 {
149 150 151 152
	return dr.pbdata.GetFilesize()
}

// Read reads data from the DAG structured file
Hector Sanjuan's avatar
Hector Sanjuan committed
153
func (dr *PBDagReader) Read(b []byte) (int, error) {
154 155 156 157
	return dr.CtxReadFull(dr.ctx, b)
}

// CtxReadFull reads data from the DAG structured file
Hector Sanjuan's avatar
Hector Sanjuan committed
158
func (dr *PBDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
159 160 161 162 163 164
	if dr.buf == nil {
		if err := dr.precalcNextBuf(ctx); err != nil {
			return 0, err
		}
	}

165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
	// If no cached buffer, load one
	total := 0
	for {
		// Attempt to fill bytes from cached buffer
		n, err := dr.buf.Read(b[total:])
		total += n
		dr.offset += int64(n)
		if err != nil {
			// EOF is expected
			if err != io.EOF {
				return total, err
			}
		}

		// If weve read enough bytes, return
		if total == len(b) {
			return total, nil
		}

		// Otherwise, load up the next block
		err = dr.precalcNextBuf(ctx)
		if err != nil {
			return total, err
		}
	}
}

Hector Sanjuan's avatar
Hector Sanjuan committed
192 193
// WriteTo writes to the given writer.
func (dr *PBDagReader) WriteTo(w io.Writer) (int64, error) {
194 195 196 197 198 199
	if dr.buf == nil {
		if err := dr.precalcNextBuf(dr.ctx); err != nil {
			return 0, err
		}
	}

200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
	// If no cached buffer, load one
	total := int64(0)
	for {
		// Attempt to write bytes from cached buffer
		n, err := dr.buf.WriteTo(w)
		total += n
		dr.offset += n
		if err != nil {
			if err != io.EOF {
				return total, err
			}
		}

		// Otherwise, load up the next block
		err = dr.precalcNextBuf(dr.ctx)
		if err != nil {
			if err == io.EOF {
				return total, nil
			}
			return total, err
		}
	}
}

Hector Sanjuan's avatar
Hector Sanjuan committed
224 225
// Close closes the reader.
func (dr *PBDagReader) Close() error {
226 227 228 229
	dr.cancel()
	return nil
}

Hector Sanjuan's avatar
Hector Sanjuan committed
230 231
// Offset returns the current reader offset
func (dr *PBDagReader) Offset() int64 {
232 233 234 235 236 237 238
	return dr.offset
}

// Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek
// TODO: check if we can do relative seeks, to reduce the amount of dagreader
// recreations that need to happen.
Hector Sanjuan's avatar
Hector Sanjuan committed
239
func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) {
240
	switch whence {
241
	case io.SeekStart:
242 243 244
		if offset < 0 {
			return -1, errors.New("Invalid offset")
		}
245 246 247
		if offset == dr.offset {
			return offset, nil
		}
248 249 250 251 252 253 254 255

		// Grab cached protobuf object (solely to make code look cleaner)
		pb := dr.pbdata

		// left represents the number of bytes remaining to seek to (from beginning)
		left := offset
		if int64(len(pb.Data)) >= offset {
			// Close current buf to close potential child dagreader
256 257 258
			if dr.buf != nil {
				dr.buf.Close()
			}
Hector Sanjuan's avatar
Hector Sanjuan committed
259
			dr.buf = newBufDagReader(pb.GetData()[offset:])
260 261 262 263 264 265 266

			// start reading links from the beginning
			dr.linkPosition = 0
			dr.offset = offset
			return offset, nil
		}

Hector Sanjuan's avatar
Hector Sanjuan committed
267 268 269
		// skip past root block data
		left -= int64(len(pb.Data))

270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
		// iterate through links and find where we need to be
		for i := 0; i < len(pb.Blocksizes); i++ {
			if pb.Blocksizes[i] > uint64(left) {
				dr.linkPosition = i
				break
			} else {
				left -= int64(pb.Blocksizes[i])
			}
		}

		// start sub-block request
		err := dr.precalcNextBuf(dr.ctx)
		if err != nil {
			return 0, err
		}

		// set proper offset within child readseeker
287
		n, err := dr.buf.Seek(left, io.SeekStart)
288 289 290 291 292 293 294 295 296 297 298
		if err != nil {
			return -1, err
		}

		// sanity
		left -= n
		if left != 0 {
			return -1, errors.New("failed to seek properly")
		}
		dr.offset = offset
		return offset, nil
299
	case io.SeekCurrent:
300
		// TODO: be smarter here
301 302 303 304
		if offset == 0 {
			return dr.offset, nil
		}

305
		noffset := dr.offset + offset
306 307
		return dr.Seek(noffset, io.SeekStart)
	case io.SeekEnd:
308
		noffset := int64(dr.pbdata.GetFilesize()) - offset
309 310 311 312 313 314 315 316 317 318
		n, err := dr.Seek(noffset, io.SeekStart)

		// Return negative number if we can't figure out the file size. Using io.EOF
		// for this seems to be good(-enough) solution as it's only returned by
		// precalcNextBuf when we step out of file range.
		// This is needed for gateway to function properly
		if err == io.EOF && *dr.pbdata.Type == ftpb.Data_File {
			return -1, nil
		}
		return n, err
319 320 321 322
	default:
		return 0, errors.New("invalid whence")
	}
}