pbdagreader.go 7.32 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
package io

import (
	"context"
	"errors"
	"fmt"
	"io"

	mdag "github.com/ipfs/go-ipfs/merkledag"
	ft "github.com/ipfs/go-ipfs/unixfs"
	ftpb "github.com/ipfs/go-ipfs/unixfs/pb"

	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
Steven Allen's avatar
Steven Allen committed
14
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
15
	ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
16 17 18 19
)

// DagReader provides a way to easily read the data contained in a dag.
type pbDagReader struct {
20
	serv ipld.DAGService
21 22 23 24 25 26 27 28 29 30 31

	// the node being read
	node *mdag.ProtoNode

	// cached protobuf structure from node.Data
	pbdata *ftpb.Data

	// the current data buffer to be read from
	// will either be a bytes.Reader or a child DagReader
	buf ReadSeekCloser

32
	// NodePromises for each of 'nodes' child links
33
	promises []*ipld.NodePromise
34

35 36 37
	// the cid of each child of the current node
	links []*cid.Cid

38 39 40 41 42 43 44 45 46 47 48 49 50
	// the index of the child link currently being read from
	linkPosition int

	// current offset for the read head within the 'file'
	offset int64

	// Our context
	ctx context.Context

	// context cancel for children
	cancel func()
}

Jakub Sztandera's avatar
Jakub Sztandera committed
51 52
var _ DagReader = (*pbDagReader)(nil)

Steven Allen's avatar
Steven Allen committed
53
// NewPBFileReader constructs a new PBFileReader.
54
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ipld.DAGService) *pbDagReader {
55
	fctx, cancel := context.WithCancel(ctx)
56
	curLinks := getLinkCids(n)
57 58 59
	return &pbDagReader{
		node:     n,
		serv:     serv,
60
		buf:      NewBufDagReader(pb.GetData()),
61
		promises: make([]*ipld.NodePromise, len(curLinks)),
62
		links:    curLinks,
63 64 65 66 67 68
		ctx:      fctx,
		cancel:   cancel,
		pbdata:   pb,
	}
}

69 70 71 72 73 74 75 76 77
const preloadSize = 10

func (dr *pbDagReader) preloadNextNodes(ctx context.Context) {
	beg := dr.linkPosition
	end := beg + preloadSize
	if end >= len(dr.links) {
		end = len(dr.links)
	}

78
	for i, p := range ipld.GetNodes(ctx, dr.serv, dr.links[beg:end]) {
79 80 81 82
		dr.promises[beg+i] = p
	}
}

83 84 85
// precalcNextBuf follows the next link in line and loads it from the
// DAGService, setting the next buffer to read from
func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
86 87 88 89 90
	if dr.buf != nil {
		dr.buf.Close() // Just to make sure
		dr.buf = nil
	}

91 92 93 94
	if dr.linkPosition >= len(dr.promises) {
		return io.EOF
	}

95 96 97 98
	if dr.promises[dr.linkPosition] == nil {
		dr.preloadNextNodes(ctx)
	}

99 100 101 102
	nxt, err := dr.promises[dr.linkPosition].Get(ctx)
	if err != nil {
		return err
	}
103
	dr.promises[dr.linkPosition] = nil
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
	dr.linkPosition++

	switch nxt := nxt.(type) {
	case *mdag.ProtoNode:
		pb := new(ftpb.Data)
		err = proto.Unmarshal(nxt.Data(), pb)
		if err != nil {
			return fmt.Errorf("incorrectly formatted protobuf: %s", err)
		}

		switch pb.GetType() {
		case ftpb.Data_Directory:
			// A directory should not exist within a file
			return ft.ErrInvalidDirLocation
		case ftpb.Data_File:
			dr.buf = NewPBFileReader(dr.ctx, nxt, pb, dr.serv)
			return nil
		case ftpb.Data_Raw:
122
			dr.buf = NewBufDagReader(pb.GetData())
123 124 125 126 127 128 129 130 131
			return nil
		case ftpb.Data_Metadata:
			return errors.New("shouldnt have had metadata object inside file")
		case ftpb.Data_Symlink:
			return errors.New("shouldnt have had symlink inside file")
		default:
			return ft.ErrUnrecognizedType
		}
	default:
132 133 134
		var err error
		dr.buf, err = NewDagReader(ctx, nxt, dr.serv)
		return err
135 136 137
	}
}

138
func getLinkCids(n ipld.Node) []*cid.Cid {
139 140 141 142 143 144 145 146
	links := n.Links()
	out := make([]*cid.Cid, 0, len(links))
	for _, l := range links {
		out = append(out, l.Cid)
	}
	return out
}

147 148 149 150 151 152 153 154 155 156 157 158
// Size return the total length of the data from the DAG structured file.
func (dr *pbDagReader) Size() uint64 {
	return dr.pbdata.GetFilesize()
}

// Read reads data from the DAG structured file
func (dr *pbDagReader) Read(b []byte) (int, error) {
	return dr.CtxReadFull(dr.ctx, b)
}

// CtxReadFull reads data from the DAG structured file
func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
159 160 161 162 163 164
	if dr.buf == nil {
		if err := dr.precalcNextBuf(ctx); err != nil {
			return 0, err
		}
	}

165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
	// If no cached buffer, load one
	total := 0
	for {
		// Attempt to fill bytes from cached buffer
		n, err := dr.buf.Read(b[total:])
		total += n
		dr.offset += int64(n)
		if err != nil {
			// EOF is expected
			if err != io.EOF {
				return total, err
			}
		}

		// If weve read enough bytes, return
		if total == len(b) {
			return total, nil
		}

		// Otherwise, load up the next block
		err = dr.precalcNextBuf(ctx)
		if err != nil {
			return total, err
		}
	}
}

func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) {
193 194 195 196 197 198
	if dr.buf == nil {
		if err := dr.precalcNextBuf(dr.ctx); err != nil {
			return 0, err
		}
	}

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
	// If no cached buffer, load one
	total := int64(0)
	for {
		// Attempt to write bytes from cached buffer
		n, err := dr.buf.WriteTo(w)
		total += n
		dr.offset += n
		if err != nil {
			if err != io.EOF {
				return total, err
			}
		}

		// Otherwise, load up the next block
		err = dr.precalcNextBuf(dr.ctx)
		if err != nil {
			if err == io.EOF {
				return total, nil
			}
			return total, err
		}
	}
}

func (dr *pbDagReader) Close() error {
	dr.cancel()
	return nil
}

func (dr *pbDagReader) Offset() int64 {
	return dr.offset
}

// Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek
// TODO: check if we can do relative seeks, to reduce the amount of dagreader
// recreations that need to happen.
func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) {
	switch whence {
238
	case io.SeekStart:
239 240 241
		if offset < 0 {
			return -1, errors.New("Invalid offset")
		}
242 243 244
		if offset == dr.offset {
			return offset, nil
		}
245 246 247 248 249 250 251 252

		// Grab cached protobuf object (solely to make code look cleaner)
		pb := dr.pbdata

		// left represents the number of bytes remaining to seek to (from beginning)
		left := offset
		if int64(len(pb.Data)) >= offset {
			// Close current buf to close potential child dagreader
253 254 255
			if dr.buf != nil {
				dr.buf.Close()
			}
256
			dr.buf = NewBufDagReader(pb.GetData()[offset:])
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283

			// start reading links from the beginning
			dr.linkPosition = 0
			dr.offset = offset
			return offset, nil
		} else {
			// skip past root block data
			left -= int64(len(pb.Data))
		}

		// iterate through links and find where we need to be
		for i := 0; i < len(pb.Blocksizes); i++ {
			if pb.Blocksizes[i] > uint64(left) {
				dr.linkPosition = i
				break
			} else {
				left -= int64(pb.Blocksizes[i])
			}
		}

		// start sub-block request
		err := dr.precalcNextBuf(dr.ctx)
		if err != nil {
			return 0, err
		}

		// set proper offset within child readseeker
284
		n, err := dr.buf.Seek(left, io.SeekStart)
285 286 287 288 289 290 291 292 293 294 295
		if err != nil {
			return -1, err
		}

		// sanity
		left -= n
		if left != 0 {
			return -1, errors.New("failed to seek properly")
		}
		dr.offset = offset
		return offset, nil
296
	case io.SeekCurrent:
297
		// TODO: be smarter here
298 299 300 301
		if offset == 0 {
			return dr.offset, nil
		}

302
		noffset := dr.offset + offset
303 304
		return dr.Seek(noffset, io.SeekStart)
	case io.SeekEnd:
305
		noffset := int64(dr.pbdata.GetFilesize()) - offset
306 307 308 309 310 311 312 313 314 315
		n, err := dr.Seek(noffset, io.SeekStart)

		// Return negative number if we can't figure out the file size. Using io.EOF
		// for this seems to be good(-enough) solution as it's only returned by
		// precalcNextBuf when we step out of file range.
		// This is needed for gateway to function properly
		if err == io.EOF && *dr.pbdata.Type == ftpb.Data_File {
			return -1, nil
		}
		return n, err
316 317 318 319
	default:
		return 0, errors.New("invalid whence")
	}
}