importer.go 8.11 KB
Newer Older
1 2
// package importer implements utilities used to create ipfs DAGs from files
// and readers
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4 5
package importer

import (
Jeromy's avatar
Jeromy committed
6
	"errors"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7 8
	"fmt"
	"io"
9
	"os"
10

11
	"github.com/jbenet/go-ipfs/importer/chunk"
12
	dag "github.com/jbenet/go-ipfs/merkledag"
13
	"github.com/jbenet/go-ipfs/pin"
14
	ft "github.com/jbenet/go-ipfs/unixfs"
15
	"github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

18 19
var log = util.Logger("importer")

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
20
// BlockSizeLimit specifies the maximum size an imported block can have.
Jeromy's avatar
Jeromy committed
21 22
var BlockSizeLimit = 1048576 // 1 MB

23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
// rough estimates on expected sizes
var roughDataBlockSize = chunk.DefaultBlockSize
var roughLinkBlockSize = 1 << 13 // 8KB
var roughLinkSize = 258 + 8 + 5  // sha256 multihash + size + no name + protobuf framing

// DefaultLinksPerBlock governs how the importer decides how many links there
// will be per block. This calculation is based on expected distributions of:
//  * the expected distribution of block sizes
//  * the expected distribution of link sizes
//  * desired access speed
// For now, we use:
//
//   var roughLinkBlockSize = 1 << 13 // 8KB
//   var roughLinkSize = 288          // sha256 + framing + name
//   var DefaultLinksPerBlock = (roughLinkBlockSize / roughLinkSize)
//
// See calc_test.go
var DefaultLinksPerBlock = (roughLinkBlockSize / roughLinkSize)
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
41 42 43

// ErrSizeLimitExceeded signals that a block is larger than BlockSizeLimit.
var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44

Jeromy's avatar
Jeromy committed
45 46 47 48 49 50 51 52 53 54
// IndirectBlocksCopyData governs whether indirect blocks should copy over
// data from their first child, and how much. If this is 0, indirect blocks
// have no data, only links. If this is larger, Indirect blocks will copy
// as much as (maybe less than) this many bytes.
//
// This number should be <= (BlockSizeLimit - (DefaultLinksPerBlock * LinkSize))
// Note that it is not known here what the LinkSize is, because the hash function
// could vary wildly in size. Exercise caution when setting this option. For
// safety, it will be clipped to (BlockSizeLimit - (DefaultLinksPerBlock * 256))
var IndirectBlockDataSize = 0
55

Jeromy's avatar
Jeromy committed
56 57 58 59 60 61 62
// this check is here to ensure the conditions on IndirectBlockDataSize hold.
// returns int because it will be used as an input to `make()` later on. if
// `int` will flip over to negative, better know here.
func defaultIndirectBlockDataSize() int {
	max := BlockSizeLimit - (DefaultLinksPerBlock * 256)
	if IndirectBlockDataSize < max {
		max = IndirectBlockDataSize
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
	}
Jeromy's avatar
Jeromy committed
64 65
	if max < 0 {
		return 0
Jeromy's avatar
Jeromy committed
66
	}
Jeromy's avatar
Jeromy committed
67
	return max
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68
}
69

Jeromy's avatar
Jeromy committed
70 71 72
// Builds a DAG from the given file, writing created blocks to disk as they are
// created
func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*dag.Node, error) {
73 74 75 76 77 78
	stat, err := os.Stat(fpath)
	if err != nil {
		return nil, err
	}

	if stat.IsDir() {
79
		return nil, fmt.Errorf("`%s` is a directory", fpath)
80 81 82 83 84 85 86 87
	}

	f, err := os.Open(fpath)
	if err != nil {
		return nil, err
	}
	defer f.Close()

Jeromy's avatar
Jeromy committed
88
	return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter)
89
}
90

Jeromy's avatar
Jeromy committed
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
// unixfsNode is a struct created to aid in the generation
// of unixfs DAG trees
type unixfsNode struct {
	node *dag.Node
	ufmt *ft.MultiBlock
}

func newUnixfsNode() *unixfsNode {
	return &unixfsNode{
		node: new(dag.Node),
		ufmt: new(ft.MultiBlock),
	}
}

func (n *unixfsNode) numChildren() int {
	return n.ufmt.NumChildren()
}

// addChild will add the given unixfsNode as a child of the receiver.
// the passed in dagBuilderHelper is used to store the child node an
// pin it locally so it doesnt get lost
func (n *unixfsNode) addChild(child *unixfsNode, db *dagBuilderHelper) error {
	n.ufmt.AddBlockSize(child.ufmt.FileSize())

	childnode, err := child.getDagNode()
116
	if err != nil {
Jeromy's avatar
Jeromy committed
117
		return err
118 119
	}

Jeromy's avatar
Jeromy committed
120 121 122 123 124
	// Add a link to this node without storing a reference to the memory
	// This way, we avoid nodes building up and consuming all of our RAM
	err = n.node.AddNodeLinkClean("", childnode)
	if err != nil {
		return err
125 126
	}

Jeromy's avatar
Jeromy committed
127
	childkey, err := db.dserv.Add(childnode)
128
	if err != nil {
Jeromy's avatar
Jeromy committed
129
		return err
130 131
	}

Jeromy's avatar
Jeromy committed
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
	// Pin the child node indirectly
	if db.mp != nil {
		db.mp.PinWithMode(childkey, pin.Indirect)
	}

	return nil
}

func (n *unixfsNode) setData(data []byte) {
	n.ufmt.Data = data
}

// getDagNode fills out the proper formatting for the unixfs node
// inside of a DAG node and returns the dag node
func (n *unixfsNode) getDagNode() (*dag.Node, error) {
	data, err := n.ufmt.GetBytes()
	if err != nil {
		return nil, err
	}
	n.node.Data = data
	return n.node, nil
153 154
}

155
func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
Jeromy's avatar
Jeromy committed
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
	// Start the splitter
	blkch := spl.Split(r)

	// Create our builder helper
	db := &dagBuilderHelper{
		dserv:    ds,
		mp:       mp,
		in:       blkch,
		maxlinks: DefaultLinksPerBlock,
		indrSize: defaultIndirectBlockDataSize(),
	}

	var root *unixfsNode
	for level := 0; !db.done(); level++ {

		nroot := newUnixfsNode()
172

Jeromy's avatar
Jeromy committed
173 174 175 176 177
		// add our old root as a child of the new root.
		if root != nil { // nil if it's the first node.
			if err := nroot.addChild(root, db); err != nil {
				return nil, err
			}
Jeromy's avatar
Jeromy committed
178 179
		}

Jeromy's avatar
Jeromy committed
180 181
		// fill it up.
		if err := db.fillNodeRec(nroot, level); err != nil {
182 183
			return nil, err
		}
Jeromy's avatar
Jeromy committed
184 185 186 187 188

		root = nroot
	}
	if root == nil {
		root = newUnixfsNode()
189 190
	}

Jeromy's avatar
Jeromy committed
191
	rootnode, err := root.getDagNode()
192 193 194
	if err != nil {
		return nil, err
	}
195

Jeromy's avatar
Jeromy committed
196
	rootkey, err := ds.Add(rootnode)
197 198 199
	if err != nil {
		return nil, err
	}
Jeromy's avatar
Jeromy committed
200

201
	if mp != nil {
Jeromy's avatar
Jeromy committed
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
		mp.PinWithMode(rootkey, pin.Recursive)
	}

	return root.getDagNode()
}

// dagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type dagBuilderHelper struct {
	dserv    dag.DAGService
	mp       pin.ManualPinner
	in       <-chan []byte
	nextData []byte // the next item to return.
	maxlinks int
	indrSize int // see IndirectBlockData
}

// prepareNext consumes the next item from the channel and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
//
// i realized that building the dag becomes _a lot_ easier if we can
// "peek" the "are done yet?" (i.e. not consume it from the channel)
func (db *dagBuilderHelper) prepareNext() {
	if db.in == nil {
		// if our input is nil, there is "nothing to do". we're done.
		// as if there was no data at all. (a sort of zero-value)
		return
	}

	// if we already have data waiting to be consumed, we're ready.
	if db.nextData != nil {
		return
	}

	// if it's closed, nextData will be correctly set to nil, signaling
	// that we're done consuming from the channel.
	db.nextData = <-db.in
}

// done returns whether or not we're done consuming the incoming data.
func (db *dagBuilderHelper) done() bool {
	// ensure we have an accurate perspective on data
	// as `done` this may be called before `next`.
	db.prepareNext() // idempotent
	return db.nextData == nil
}

// next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish
func (db *dagBuilderHelper) next() []byte {
	db.prepareNext() // idempotent
	d := db.nextData
	db.nextData = nil // signal we've consumed it
	return d
}

// fillNodeRec will fill the given node with data from the dagBuilders input
// source down to an indirection depth as specified by 'depth'
// it returns the total dataSize of the node, and a potential error
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func (db *dagBuilderHelper) fillNodeRec(node *unixfsNode, depth int) error {
	if depth < 0 {
		return errors.New("attempt to fillNode at depth < 0")
	}

	// Base case
	if depth <= 0 { // catch accidental -1's in case error above is removed.
		return db.fillNodeWithData(node)
273 274
	}

Jeromy's avatar
Jeromy committed
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
	// while we have room AND we're not done
	for node.numChildren() < db.maxlinks && !db.done() {
		child := newUnixfsNode()

		if err := db.fillNodeRec(child, depth-1); err != nil {
			return err
		}

		if err := node.addChild(child, db); err != nil {
			return err
		}
	}

	return nil
}

func (db *dagBuilderHelper) fillNodeWithData(node *unixfsNode) error {
	data := db.next()
	if data == nil { // we're done!
		return nil
	}

	if len(data) > BlockSizeLimit {
		return ErrSizeLimitExceeded
	}

	node.setData(data)
	return nil
}

// why is intmin not in math?
func min(a, b int) int {
	if a > b {
		return a
	}
	return b
311
}