Commit 73f9a90c authored by Kevin Atkinson's avatar Kevin Atkinson

Create a FilestoreNode object to carry PosInfo

When doing a filestore add, we wrap whatever nodes we create in a
FilestoreNode object and add the PosInfo to it so that the filestore
will be able to extract information as needed.

Edited by whyrusleeping

License: MIT
Signed-off-by: default avatarKevin Atkinson <k@kevina.org>
parent 6f3ae5da
......@@ -55,3 +55,8 @@ type SizeFile interface {
Size() (int64, error)
}
type FileInfo interface {
FullPath() string
Stat() os.FileInfo
}
......@@ -398,7 +398,12 @@ func (adder *Adder) addFile(file files.File) error {
// progress updates to the client (over the output channel)
var reader io.Reader = file
if adder.Progress {
reader = &progressReader{file: file, out: adder.Out}
rdr := &progressReader{file: file, out: adder.Out}
if fi, ok := file.(files.FileInfo); ok {
reader = &progressReader2{rdr, fi}
} else {
reader = rdr
}
}
dagnode, err := adder.add(reader)
......@@ -520,3 +525,8 @@ func (i *progressReader) Read(p []byte) (int, error) {
return n, err
}
type progressReader2 struct {
*progressReader
files.FileInfo
}
......@@ -9,10 +9,12 @@ import (
)
func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
var offset uint64 = 0
var root *h.UnixfsNode
for level := 0; !db.Done(); level++ {
nroot := h.NewUnixfsNode()
db.SetPosInfo(nroot, 0)
// add our old root as a child of the new root.
if root != nil { // nil if it's the first node.
......@@ -22,11 +24,13 @@ func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
}
// fill it up.
if err := fillNodeRec(db, nroot, level); err != nil {
if err := fillNodeRec(db, nroot, level, offset); err != nil {
return nil, err
}
offset = nroot.FileSize()
root = nroot
}
if root == nil {
root = h.NewUnixfsNode()
......@@ -50,7 +54,7 @@ func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
// it returns the total dataSize of the node, and a potential error
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int, offset uint64) error {
if depth < 0 {
return errors.New("attempt to fillNode at depth < 0")
}
......@@ -69,8 +73,9 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
// while we have room AND we're not done
for node.NumChildren() < db.Maxlinks() && !db.Done() {
child := h.NewUnixfsNode()
db.SetPosInfo(child, offset)
err := fillNodeRec(db, child, depth-1)
err := fillNodeRec(db, child, depth-1, offset)
if err != nil {
return err
}
......@@ -78,6 +83,7 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
if err := node.AddChild(child, db); err != nil {
return err
}
offset += child.FileSize()
}
return nil
......
......@@ -10,7 +10,8 @@ import (
var IpfsRabinPoly = chunker.Pol(17437180132763653)
type Rabin struct {
r *chunker.Chunker
r *chunker.Chunker
reader io.Reader
}
func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin {
......@@ -25,7 +26,8 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin {
ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max)
return &Rabin{
r: ch,
r: ch,
reader: r,
}
}
......@@ -37,3 +39,7 @@ func (r *Rabin) NextBytes() ([]byte, error) {
return ch.Data, nil
}
func (r *Rabin) Reader() io.Reader {
return r.reader
}
......@@ -12,6 +12,7 @@ var log = logging.Logger("chunk")
var DefaultBlockSize int64 = 1024 * 256
type Splitter interface {
Reader() io.Reader
NextBytes() ([]byte, error)
}
......@@ -77,3 +78,7 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
return buf[:n], nil
}
func (ss *sizeSplitterv2) Reader() io.Reader {
return ss.r
}
package helpers
import (
"io"
"os"
"github.com/ipfs/go-ipfs/commands/files"
"github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
......@@ -17,6 +21,8 @@ type DagBuilderHelper struct {
nextData []byte // the next item to return.
maxlinks int
batch *dag.Batch
fullPath string
stat os.FileInfo
}
type DagBuilderParams struct {
......@@ -34,13 +40,18 @@ type DagBuilderParams struct {
// Generate a new DagBuilderHelper from the given params, which data source comes
// from chunks object
func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
return &DagBuilderHelper{
db := &DagBuilderHelper{
dserv: dbp.Dagserv,
spl: spl,
rawLeaves: dbp.RawLeaves,
maxlinks: dbp.Maxlinks,
batch: dbp.Dagserv.Batch(),
}
if fi, ok := spl.Reader().(files.FileInfo); ok {
db.fullPath = fi.FullPath()
db.stat = fi.Stat()
}
return db
}
// prepareNext consumes the next item from the splitter and puts it
......@@ -48,12 +59,14 @@ func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
// it will do nothing.
func (db *DagBuilderHelper) prepareNext() {
// if we already have data waiting to be consumed, we're ready
if db.nextData != nil {
if db.nextData != nil || db.recvdErr != nil {
return
}
// TODO: handle err (which wasn't handled either when the splitter was channeled)
db.nextData, _ = db.spl.NextBytes()
db.nextData, db.recvdErr = db.spl.NextBytes()
if db.recvdErr == io.EOF {
db.recvdErr = nil
}
}
// Done returns whether or not we're done consuming the incoming data.
......@@ -61,17 +74,24 @@ func (db *DagBuilderHelper) Done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
db.prepareNext() // idempotent
if db.recvdErr != nil {
return false
}
return db.nextData == nil
}
// Next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish
func (db *DagBuilderHelper) Next() []byte {
func (db *DagBuilderHelper) Next() ([]byte, error) {
db.prepareNext() // idempotent
d := db.nextData
db.nextData = nil // signal we've consumed it
return d
if db.recvdErr != nil {
return nil, db.recvdErr
} else {
return d, nil
}
}
// GetDagServ returns the dagservice object this Helper is using
......@@ -100,7 +120,11 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
}
func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
data := db.Next()
data, err := db.Next()
if err != nil {
return nil, err
}
if data == nil { // we're done!
return nil, nil
}
......@@ -121,6 +145,12 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
}
}
func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) {
if db.stat != nil {
node.SetPosInfo(offset, db.fullPath, db.stat)
}
}
func (db *DagBuilderHelper) Add(node *UnixfsNode) (node.Node, error) {
dn, err := node.GetDagNode()
if err != nil {
......
......@@ -3,9 +3,11 @@ package helpers
import (
"context"
"fmt"
"os"
chunk "github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
pi "github.com/ipfs/go-ipfs/thirdparty/posinfo"
ft "github.com/ipfs/go-ipfs/unixfs"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
......@@ -43,6 +45,7 @@ type UnixfsNode struct {
rawnode *dag.RawNode
node *dag.ProtoNode
ufmt *ft.FSNode
posInfo *pi.PosInfo
}
// NewUnixfsNode creates a new Unixfs node to represent a file
......@@ -144,9 +147,29 @@ func (n *UnixfsNode) FileSize() uint64 {
return n.ufmt.FileSize()
}
func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo) {
n.posInfo = &pi.PosInfo{offset, fullPath, stat}
}
// getDagNode fills out the proper formatting for the unixfs node
// inside of a DAG node and returns the dag node
func (n *UnixfsNode) GetDagNode() (node.Node, error) {
nd, err := n.getBaseDagNode()
if err != nil {
return nil, err
}
if n.posInfo != nil {
return &pi.FilestoreNode{
Node: nd,
PosInfo: n.posInfo,
}, nil
}
return nd, nil
}
func (n *UnixfsNode) getBaseDagNode() (node.Node, error) {
if n.raw {
return n.rawnode, nil
}
......
package posinfo
import (
"os"
node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)
type PosInfo struct {
Offset uint64
FullPath string
Stat os.FileInfo // can be nil
}
type FilestoreNode struct {
node.Node
PosInfo *PosInfo
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment