Commit ecd031d3 authored by Lucas Molas's avatar Lucas Molas Committed by Łukasz Magiera

unixfs: decouple the DAG traversal logic from the DAG reader

Decouple the DAG traversal logic from the `PBDagReader` encapsulating it in the
(new) `Walker` structure.

Collapse PB and Buffer DAG readers into one (`dagReader`) removing the
`bufdagreader.go` and `pbdagreader.go` files, moving all the code to
`dagreader.go`.

Remove `TestSeekAndReadLarge` and `TestReadAndCancel` which operated directly on
the `NodePromise` structure that is now abstracted away in `NavigableIPLDNode`,
in the `go-ipld-format` repo, where they should be recreated.

License: MIT
Signed-off-by: default avatarLucas Molas <schomatis@gmail.com>
parent dc3383a2
......@@ -60,7 +60,11 @@ func (w *Writer) writeFile(nd *mdag.ProtoNode, fsNode *ft.FSNode, fpath string)
return err
}
dagr := uio.NewPBFileReader(w.ctx, nd, fsNode, w.Dag)
dagr, err := uio.NewDagReader(w.ctx, nd, w.Dag)
if err != nil {
return err
}
if _, err := dagr.WriteTo(w.TarW); err != nil {
return err
}
......
package io
import (
"bytes"
"context"
)
// BufDagReader implements a DagReader that reads from a byte slice
// using a bytes.Reader. It is used for RawNodes.
type BufDagReader struct {
*bytes.Reader
}
// NewBufDagReader returns a DAG reader for the given byte slice.
// BufDagReader is used to read RawNodes.
func NewBufDagReader(b []byte) *BufDagReader {
return &BufDagReader{bytes.NewReader(b)}
}
var _ DagReader = (*BufDagReader)(nil)
// Close is a nop.
func (*BufDagReader) Close() error {
return nil
}
// CtxReadFull reads the slice onto b.
func (rd *BufDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
return rd.Read(b)
}
// Size returns the size of the buffer.
func (rd *BufDagReader) Size() uint64 {
s := rd.Reader.Size()
if s < 0 {
panic("size smaller than 0 (impossible!!)")
}
return uint64(s)
}
package io
import (
"bytes"
"context"
"errors"
"io"
"io/ioutil"
ipld "github.com/ipfs/go-ipld-format"
mdag "github.com/ipfs/go-merkledag"
ft "github.com/ipfs/go-unixfs"
unixfs "github.com/ipfs/go-unixfs"
)
// Common errors
......@@ -17,6 +19,10 @@ var (
ErrUnkownNodeType = errors.New("unknown node type")
)
// TODO: Rename the `DagReader` interface, this doesn't read *any* DAG, just
// DAGs with UnixFS node (and it *belongs* to the `unixfs` package). Some
// alternatives: `FileReader`, `UnixFSFileReader`, `UnixFSReader`.
// A DagReader provides read-only read and seek acess to a unixfs file.
// Different implementations of readers are used for the different
// types of unixfs/protobuf-encoded nodes.
......@@ -35,24 +41,29 @@ type ReadSeekCloser interface {
}
// NewDagReader creates a new reader object that reads the data represented by
// the given node, using the passed in DAGService for data retrieval
// the given node, using the passed in DAGService for data retrieval.
func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagReader, error) {
var size uint64
switch n := n.(type) {
case *mdag.RawNode:
return NewBufDagReader(n.RawData()), nil
size = uint64(len(n.RawData()))
case *mdag.ProtoNode:
fsNode, err := ft.FSNodeFromBytes(n.Data())
fsNode, err := unixfs.FSNodeFromBytes(n.Data())
if err != nil {
return nil, err
}
switch fsNode.Type() {
case ft.TDirectory, ft.THAMTShard:
case unixfs.TFile, unixfs.TRaw:
size = fsNode.FileSize()
case unixfs.TDirectory, unixfs.THAMTShard:
// Dont allow reading directories
return nil, ErrIsDir
case ft.TFile, ft.TRaw:
return NewPBFileReader(ctx, n, fsNode, serv), nil
case ft.TMetadata:
case unixfs.TMetadata:
if len(n.Links()) == 0 {
return nil, errors.New("incorrectly formatted metadata object")
}
......@@ -66,12 +77,353 @@ func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagRe
return nil, mdag.ErrNotProtobuf
}
return NewDagReader(ctx, childpb, serv)
case ft.TSymlink:
case unixfs.TSymlink:
return nil, ErrCantReadSymlinks
default:
return nil, ft.ErrUnrecognizedType
return nil, unixfs.ErrUnrecognizedType
}
default:
return nil, ErrUnkownNodeType
}
ctxWithCancel, cancel := context.WithCancel(ctx)
return &dagReader{
ctx: ctxWithCancel,
cancel: cancel,
serv: serv,
size: size,
rootNode: n,
dagWalker: ipld.NewWalker(ctxWithCancel, ipld.NewNavigableIPLDNode(n, serv)),
}, nil
}
// dagReader provides a way to easily read the data contained in a dag.
type dagReader struct {
// Structure to perform the DAG iteration and search, the reader
// just needs to add logic to the `Visitor` callback passed to
// `Iterate` and `Seek`.
dagWalker *ipld.Walker
// Buffer with the data extracted from the current node being visited.
// To avoid revisiting a node to complete a (potential) partial read
// (or read after seek) the node's data is fully extracted in a single
// `readNodeDataBuffer` operation.
currentNodeData *bytes.Reader
// Implements the `Size()` API.
size uint64
// Current offset for the read head within the DAG file.
offset int64
// Root node of the DAG, stored to re-create the `dagWalker` (effectively
// re-setting the position of the reader, used during `Seek`).
rootNode ipld.Node
// Context passed to the `dagWalker`, the `cancel` function is used to
// cancel read operations (cancelling requested child node promises,
// see `ipld.NavigableIPLDNode.FetchChild` for details).
ctx context.Context
cancel func()
// Passed to the `dagWalker` that will use it to request nodes.
// TODO: Revisit name.
serv ipld.NodeGetter
}
// Size returns the total size of the data from the DAG structured file.
func (dr *dagReader) Size() uint64 {
return dr.size
}
// Read implements the `io.Reader` interface through the `CtxReadFull`
// method using the DAG reader's internal context.
func (dr *dagReader) Read(b []byte) (int, error) {
return dr.CtxReadFull(dr.ctx, b)
}
// CtxReadFull reads data from the DAG structured file. It always
// attempts a full read of the DAG until the `out` buffer is full.
// It uses the `Walker` structure to iterate the file DAG and read
// every node's data into the `out` buffer.
func (dr *dagReader) CtxReadFull(ctx context.Context, out []byte) (n int, err error) {
// Set the `dagWalker`'s context to the `ctx` argument, it will be used
// to fetch the child node promises (see
// `ipld.NavigableIPLDNode.FetchChild` for details).
dr.dagWalker.SetContext(ctx)
// If there was a partially read buffer from the last visited
// node read it before visiting a new one.
if dr.currentNodeData != nil {
// TODO: Move this check inside `readNodeDataBuffer`?
n = dr.readNodeDataBuffer(out)
if n == len(out) {
return n, nil
// Output buffer full, no need to traverse the DAG.
}
}
// Iterate the DAG calling the passed `Visitor` function on every node
// to read its data into the `out` buffer, stop if there is an error or
// if the entire DAG is traversed (`EndOfDag`).
err = dr.dagWalker.Iterate(func(visitedNode ipld.NavigableNode) error {
node := ipld.ExtractIPLDNode(visitedNode)
// Skip internal nodes, they shouldn't have any file data
// (see the `balanced` package for more details).
if len(node.Links()) > 0 {
return nil
}
err = dr.saveNodeData(node)
if err != nil {
return err
}
// Save the leaf node file data in a buffer in case it is only
// partially read now and future `CtxReadFull` calls reclaim the
// rest (as each node is visited only once during `Iterate`).
//
// TODO: We could check if the entire node's data can fit in the
// remaining `out` buffer free space to skip this intermediary step.
n += dr.readNodeDataBuffer(out[n:])
if n == len(out) {
// Output buffer full, no need to keep traversing the DAG,
// signal the `Walker` to pause the iteration.
dr.dagWalker.Pause()
}
return nil
})
if err == ipld.EndOfDag {
return n, io.EOF
// Reached the end of the (DAG) file, no more data to read.
} else if err != nil {
return n, err
// Pass along any other errors from the `Visitor`.
}
return n, nil
}
// Save the UnixFS `node`'s data into the internal `currentNodeData` buffer to
// later move it to the output buffer (`Read`) or seek into it (`Seek`).
func (dr *dagReader) saveNodeData(node ipld.Node) error {
extractedNodeData, err := unixfs.ReadUnixFSNodeData(node)
if err != nil {
return err
}
dr.currentNodeData = bytes.NewReader(extractedNodeData)
return nil
}
// Read the `currentNodeData` buffer into `out`. This function can't have
// any errors as it's always reading from a `bytes.Reader` and asking only
// the available data in it.
func (dr *dagReader) readNodeDataBuffer(out []byte) int {
n, _ := dr.currentNodeData.Read(out)
// Ignore the error as the EOF may not be returned in the first
// `Read` call, explicitly ask for an empty buffer below to check
// if we've reached the end.
if dr.currentNodeData.Len() == 0 {
dr.currentNodeData = nil
// Signal that the buffer was consumed (for later `Read` calls).
// This shouldn't return an EOF error as it's just the end of a
// single node's data, not the entire DAG.
}
dr.offset += int64(n)
// TODO: Should `offset` be incremented here or in the calling function?
// (Doing it here saves LoC but may be confusing as it's more hidden).
return n
}
// WriteTo writes to the given writer.
//
// TODO: Improve performance. It would be better to progressively
// write each node to the writer on every visit instead of allocating
// a huge buffer, that would imply defining a `Visitor` very similar
// to the one used in `CtxReadFull` (that would write to the `io.Writer`
// instead of the reading into the `currentNodeData` buffer). More
// consideration is needed to restructure those two `Visitor` functions
// to avoid repeating code.
func (dr *dagReader) WriteTo(w io.Writer) (int64, error) {
writeBuf, err := ioutil.ReadAll(dr)
if err != nil {
return 0, err
}
return bytes.NewReader(writeBuf).WriteTo(w)
}
// Close the reader (cancelling fetch node operations requested with
// the internal context, that is, `Read` calls but not `CtxReadFull`
// with user-supplied contexts).
func (dr *dagReader) Close() error {
dr.cancel()
return nil
}
// Seek implements `io.Seeker` seeking to a given offset in the DAG file,
// it matches the standard unix `seek`. It moves the position of the internal
// `dagWalker` and may also leave a `currentNodeData` buffer loaded in case
// the seek is performed to the middle of the data in a node.
//
// TODO: Support seeking from the current position (relative seek)
// through the `dagWalker` in `io.SeekCurrent`.
func (dr *dagReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
if offset < 0 {
return -1, errors.New("invalid offset")
}
if offset == dr.offset {
return offset, nil
// Already at the requested `offset`, nothing to do.
}
left := offset
// Amount left to seek.
// Seek from the beginning of the DAG.
dr.resetPosition()
// Use the internal reader's context to fetch the child node promises
// (see `ipld.NavigableIPLDNode.FetchChild` for details).
dr.dagWalker.SetContext(dr.ctx)
// TODO: Performance: we could adjust here `preloadSize` of
// `ipld.NavigableIPLDNode` also, when seeking we only want
// to fetch one child at a time.
// Seek the DAG by calling the provided `Visitor` function on every
// node the `dagWalker` descends to while searching which can be
// either an internal or leaf node. In the internal node case, check
// the child node sizes and set the corresponding child index to go
// down to next. In the leaf case (last visit of the search), if there
// is still an amount `left` to seek do it inside the node's data
// saved in the `currentNodeData` buffer, leaving it ready for a `Read`
// call.
err := dr.dagWalker.Seek(func(visitedNode ipld.NavigableNode) error {
node := ipld.ExtractIPLDNode(visitedNode)
if len(node.Links()) > 0 {
// Internal node, should be a `mdag.ProtoNode` containing a
// `unixfs.FSNode` (see the `balanced` package for more details).
fsNode, err := unixfs.ExtractFSNode(node)
if err != nil {
return err
}
// If there aren't enough size hints don't seek
// (see the `io.EOF` handling error comment below).
if fsNode.NumChildren() != len(node.Links()) {
return io.EOF
}
// Internal nodes have no data, so just iterate through the
// sizes of its children (advancing the child index of the
// `dagWalker`) to find where we need to go down to next in
// the search.
for {
childSize := fsNode.BlockSize(int(dr.dagWalker.ActiveChildIndex()))
if childSize > uint64(left) {
// This child's data contains the position requested
// in `offset`, go down this child.
return nil
}
// Else, skip this child.
left -= int64(childSize)
err := dr.dagWalker.NextChild()
if err == ipld.ErrNextNoChild {
// No more child nodes available, nothing to do,
// the `Seek` will stop on its own.
return nil
} else if err != nil {
return err
// Pass along any other errors (that may in future
// implementations be returned by `Next`) to stop
// the search.
}
}
} else {
// Leaf node, seek inside its data.
err := dr.saveNodeData(node)
if err != nil {
return err
}
_, err = dr.currentNodeData.Seek(left, io.SeekStart)
if err != nil {
return err
}
// The corner case of a DAG consisting only of a single (leaf)
// node should make no difference here. In that case, where the
// node doesn't have a parent UnixFS node with size hints, this
// implementation would allow this `Seek` to be called with an
// argument larger than the buffer size which normally wouldn't
// happen (because we would skip the node based on the size
// hint) but that would just mean that a future `CtxReadFull`
// call would read no data from the `currentNodeData` buffer.
// TODO: Re-check this reasoning.
return nil
// In the leaf node case the search will stop here.
}
})
if err == io.EOF {
// TODO: Taken from https://github.com/ipfs/go-ipfs/pull/4320,
// check if still valid.
// Return negative number if we can't figure out the file size. Using io.EOF
// for this seems to be good(-enough) solution as it's only returned by
// precalcNextBuf when we step out of file range.
// This is needed for gateway to function properly
return -1, nil
}
if err != nil {
return 0, err
}
dr.offset = offset
return dr.offset, nil
case io.SeekCurrent:
if offset == 0 {
return dr.offset, nil
}
return dr.Seek(dr.offset+offset, io.SeekStart)
// TODO: Performance. This can be improved supporting relative
// searches in the `Walker` (see `Walker.Seek`).
case io.SeekEnd:
return dr.Seek(int64(dr.Size())-offset, io.SeekStart)
default:
return 0, errors.New("invalid whence")
}
}
// Reset the reader position by resetting the `dagWalker` and discarding
// any partially used node's data in the `currentNodeData` buffer, used
// in the `SeekStart` case.
func (dr *dagReader) resetPosition() {
dr.currentNodeData = nil
dr.dagWalker = ipld.NewWalker(dr.ctx, ipld.NewNavigableIPLDNode(dr.rootNode, dr.serv))
// TODO: This could be avoided (along with storing the `dr.rootNode` and
// `dr.serv` just for this call) if `Reset` is supported in the `Walker`.
}
......@@ -4,7 +4,6 @@ import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"strings"
"testing"
......@@ -73,90 +72,6 @@ func TestSeekAndRead(t *testing.T) {
}
}
func TestSeekAndReadLarge(t *testing.T) {
dserv := testu.GetDAGServ()
inbuf := make([]byte, 20000)
rand.Read(inbuf)
node := testu.GetNode(t, dserv, inbuf, testu.UseProtoBufLeaves)
ctx, closer := context.WithCancel(context.Background())
defer closer()
reader, err := NewDagReader(ctx, node, dserv)
if err != nil {
t.Fatal(err)
}
_, err = reader.Seek(10000, io.SeekStart)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, 100)
_, err = io.ReadFull(reader, buf)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, inbuf[10000:10100]) {
t.Fatal("seeked read failed")
}
pbdr := reader.(*PBDagReader)
var count int
for i, p := range pbdr.promises {
if i > 20 && i < 30 {
if p == nil {
t.Fatal("expected index to be not nil: ", i)
}
count++
} else {
if p != nil {
t.Fatal("expected index to be nil: ", i)
}
}
}
// -1 because we read some and it cleared one
if count != preloadSize-1 {
t.Fatalf("expected %d preloaded promises, got %d", preloadSize-1, count)
}
}
func TestReadAndCancel(t *testing.T) {
dserv := testu.GetDAGServ()
inbuf := make([]byte, 20000)
rand.Read(inbuf)
node := testu.GetNode(t, dserv, inbuf, testu.UseProtoBufLeaves)
ctx, closer := context.WithCancel(context.Background())
defer closer()
reader, err := NewDagReader(ctx, node, dserv)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
buf := make([]byte, 100)
_, err = reader.CtxReadFull(ctx, buf)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, inbuf[0:100]) {
t.Fatal("read failed")
}
cancel()
b, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(inbuf[100:], b) {
t.Fatal("buffers not equal")
}
}
func TestRelativeSeek(t *testing.T) {
dserv := testu.GetDAGServ()
ctx, closer := context.WithCancel(context.Background())
......
package io
import (
"context"
"errors"
"fmt"
"io"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
mdag "github.com/ipfs/go-merkledag"
ft "github.com/ipfs/go-unixfs"
)
// PBDagReader provides a way to easily read the data contained in a dag.
type PBDagReader struct {
serv ipld.NodeGetter
// UnixFS file (it should be of type `Data_File` or `Data_Raw` only).
file *ft.FSNode
// the current data buffer to be read from
// will either be a bytes.Reader or a child DagReader
buf ReadSeekCloser
// NodePromises for each of 'nodes' child links
promises []*ipld.NodePromise
// the cid of each child of the current node
links []cid.Cid
// the index of the child link currently being read from
linkPosition int
// current offset for the read head within the 'file'
offset int64
// Our context
ctx context.Context
// context cancel for children
cancel func()
}
var _ DagReader = (*PBDagReader)(nil)
// NewPBFileReader constructs a new PBFileReader.
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, file *ft.FSNode, serv ipld.NodeGetter) *PBDagReader {
fctx, cancel := context.WithCancel(ctx)
curLinks := getLinkCids(n)
return &PBDagReader{
serv: serv,
buf: NewBufDagReader(file.Data()),
promises: make([]*ipld.NodePromise, len(curLinks)),
links: curLinks,
ctx: fctx,
cancel: cancel,
file: file,
}
}
const preloadSize = 10
func (dr *PBDagReader) preload(ctx context.Context, beg int) {
end := beg + preloadSize
if end >= len(dr.links) {
end = len(dr.links)
}
copy(dr.promises[beg:], ipld.GetNodes(ctx, dr.serv, dr.links[beg:end]))
}
// precalcNextBuf follows the next link in line and loads it from the
// DAGService, setting the next buffer to read from
func (dr *PBDagReader) precalcNextBuf(ctx context.Context) error {
if dr.buf != nil {
dr.buf.Close() // Just to make sure
dr.buf = nil
}
if dr.linkPosition >= len(dr.promises) {
return io.EOF
}
// If we drop to <= preloadSize/2 preloading nodes, preload the next 10.
for i := dr.linkPosition; i < dr.linkPosition+preloadSize/2 && i < len(dr.promises); i++ {
// TODO: check if canceled.
if dr.promises[i] == nil {
dr.preload(ctx, i)
break
}
}
nxt, err := dr.promises[dr.linkPosition].Get(ctx)
dr.promises[dr.linkPosition] = nil
switch err {
case nil:
case context.DeadlineExceeded, context.Canceled:
err = ctx.Err()
if err != nil {
return ctx.Err()
}
// In this case, the context used to *preload* the node has been canceled.
// We need to retry the load with our context and we might as
// well preload some extra nodes while we're at it.
//
// Note: When using `Read`, this code will never execute as
// `Read` will use the global context. It only runs if the user
// explicitly reads with a custom context (e.g., by calling
// `CtxReadFull`).
dr.preload(ctx, dr.linkPosition)
nxt, err = dr.promises[dr.linkPosition].Get(ctx)
dr.promises[dr.linkPosition] = nil
if err != nil {
return err
}
default:
return err
}
dr.linkPosition++
return dr.loadBufNode(nxt)
}
func (dr *PBDagReader) loadBufNode(node ipld.Node) error {
switch node := node.(type) {
case *mdag.ProtoNode:
fsNode, err := ft.FSNodeFromBytes(node.Data())
if err != nil {
return fmt.Errorf("incorrectly formatted protobuf: %s", err)
}
switch fsNode.Type() {
case ft.TFile:
dr.buf = NewPBFileReader(dr.ctx, node, fsNode, dr.serv)
return nil
case ft.TRaw:
dr.buf = NewBufDagReader(fsNode.Data())
return nil
default:
return fmt.Errorf("found %s node in unexpected place", fsNode.Type().String())
}
case *mdag.RawNode:
dr.buf = NewBufDagReader(node.RawData())
return nil
default:
return ErrUnkownNodeType
}
}
func getLinkCids(n ipld.Node) []cid.Cid {
links := n.Links()
out := make([]cid.Cid, 0, len(links))
for _, l := range links {
out = append(out, l.Cid)
}
return out
}
// Size return the total length of the data from the DAG structured file.
func (dr *PBDagReader) Size() uint64 {
return dr.file.FileSize()
}
// Read reads data from the DAG structured file
func (dr *PBDagReader) Read(b []byte) (int, error) {
return dr.CtxReadFull(dr.ctx, b)
}
// CtxReadFull reads data from the DAG structured file
func (dr *PBDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if dr.buf == nil {
if err := dr.precalcNextBuf(ctx); err != nil {
return 0, err
}
}
// If no cached buffer, load one
total := 0
for {
// Attempt to fill bytes from cached buffer
n, err := io.ReadFull(dr.buf, b[total:])
total += n
dr.offset += int64(n)
switch err {
// io.EOF will happen is dr.buf had noting more to read (n == 0)
case io.EOF, io.ErrUnexpectedEOF:
// do nothing
case nil:
return total, nil
default:
return total, err
}
// if we are not done with the output buffer load next block
err = dr.precalcNextBuf(ctx)
if err != nil {
return total, err
}
}
}
// WriteTo writes to the given writer.
func (dr *PBDagReader) WriteTo(w io.Writer) (int64, error) {
if dr.buf == nil {
if err := dr.precalcNextBuf(dr.ctx); err != nil {
return 0, err
}
}
// If no cached buffer, load one
total := int64(0)
for {
// Attempt to write bytes from cached buffer
n, err := dr.buf.WriteTo(w)
total += n
dr.offset += n
if err != nil {
if err != io.EOF {
return total, err
}
}
// Otherwise, load up the next block
err = dr.precalcNextBuf(dr.ctx)
if err != nil {
if err == io.EOF {
return total, nil
}
return total, err
}
}
}
// Close closes the reader.
func (dr *PBDagReader) Close() error {
dr.cancel()
return nil
}
// Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek
// TODO: check if we can do relative seeks, to reduce the amount of dagreader
// recreations that need to happen.
func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
if offset < 0 {
return -1, errors.New("invalid offset")
}
if offset == dr.offset {
return offset, nil
}
// left represents the number of bytes remaining to seek to (from beginning)
left := offset
if int64(len(dr.file.Data())) >= offset {
// Close current buf to close potential child dagreader
if dr.buf != nil {
dr.buf.Close()
}
dr.buf = NewBufDagReader(dr.file.Data()[offset:])
// start reading links from the beginning
dr.linkPosition = 0
dr.offset = offset
return offset, nil
}
// skip past root block data
left -= int64(len(dr.file.Data()))
// iterate through links and find where we need to be
for i := 0; i < dr.file.NumChildren(); i++ {
if dr.file.BlockSize(i) > uint64(left) {
dr.linkPosition = i
break
} else {
left -= int64(dr.file.BlockSize(i))
}
}
// start sub-block request
err := dr.precalcNextBuf(dr.ctx)
if err != nil {
return 0, err
}
// set proper offset within child readseeker
n, err := dr.buf.Seek(left, io.SeekStart)
if err != nil {
return -1, err
}
// sanity
left -= n
if left != 0 {
return -1, errors.New("failed to seek properly")
}
dr.offset = offset
return offset, nil
case io.SeekCurrent:
// TODO: be smarter here
if offset == 0 {
return dr.offset, nil
}
noffset := dr.offset + offset
return dr.Seek(noffset, io.SeekStart)
case io.SeekEnd:
noffset := int64(dr.file.FileSize()) - offset
n, err := dr.Seek(noffset, io.SeekStart)
// Return negative number if we can't figure out the file size. Using io.EOF
// for this seems to be good(-enough) solution as it's only returned by
// precalcNextBuf when we step out of file range.
// This is needed for gateway to function properly
if err == io.EOF && dr.file.Type() == ft.TFile {
return -1, nil
}
return n, err
default:
return 0, errors.New("invalid whence")
}
}
......@@ -5,9 +5,9 @@ package unixfs
import (
"errors"
"fmt"
proto "github.com/gogo/protobuf/proto"
dag "github.com/ipfs/go-merkledag"
ipld "github.com/ipfs/go-ipld-format"
......@@ -355,3 +355,54 @@ func BytesForMetadata(m *Metadata) ([]byte, error) {
func EmptyDirNode() *dag.ProtoNode {
return dag.NodeWithData(FolderPBData())
}
// ReadUnixFSNodeData extracts the UnixFS data from an IPLD node.
// Raw nodes are (also) processed because they are used as leaf
// nodes containing (only) UnixFS data.
func ReadUnixFSNodeData(node ipld.Node) (data []byte, err error) {
switch node := node.(type) {
case *dag.ProtoNode:
fsNode, err := FSNodeFromBytes(node.Data())
if err != nil {
return nil, fmt.Errorf("incorrectly formatted protobuf: %s", err)
}
switch fsNode.Type() {
case pb.Data_File, pb.Data_Raw:
return fsNode.Data(), nil
// Only leaf nodes (of type `Data_Raw`) contain data but due to a
// bug the `Data_File` type (normally used for internal nodes) is
// also used for leaf nodes, so both types are accepted here
// (see the `balanced` package for more details).
default:
return nil, fmt.Errorf("found %s node in unexpected place",
fsNode.Type().String())
}
case *dag.RawNode:
return node.RawData(), nil
default:
return nil, ErrUnrecognizedType
// TODO: To avoid rewriting the error message, but a different error from
// `unixfs.ErrUnrecognizedType` should be used (defining it in the
// `merkledag` or `go-ipld-format` packages).
}
}
// Extract the `unixfs.FSNode` from the `ipld.Node` (assuming this
// was implemented by a `mdag.ProtoNode`).
func ExtractFSNode(node ipld.Node) (*FSNode, error) {
protoNode, ok := node.(*dag.ProtoNode)
if !ok {
return nil, errors.New("expected a ProtoNode as internal node")
}
fsNode, err := FSNodeFromBytes(protoNode.Data())
if err != nil {
return nil, err
}
return fsNode, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment