trickledag.go 10.2 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
// Package trickle allows to build trickle DAGs.
// In this type of DAG, non-leave nodes are first filled
// with data leaves, and then incorporate "layers" of subtrees
// as additional links.
//
// Each layer is a trickle sub-tree and is limited by an increasing
// maximum depth. Thus, the nodes first layer
// can only hold leaves (depth 1) but subsequent layers can grow deeper.
// By default, this module places 4 nodes per layer (that is, 4 subtrees
// of the same maximum depth before increasing it).
//
// Trickle DAGs are very good for sequentially reading data, as the
// first data leaves are directly reachable from the root and those
// coming next are always nearby. They are
// suited for things like streaming applications.
package trickle

import (
	"context"
	"errors"
	"fmt"

Jeromy's avatar
Jeromy committed
23 24
	ft "github.com/ipfs/go-unixfs"
	h "github.com/ipfs/go-unixfs/importer/helpers"
Jeromy's avatar
Jeromy committed
25

Jeromy's avatar
Jeromy committed
26 27 28
	cid "github.com/ipfs/go-cid"
	ipld "github.com/ipfs/go-ipld-format"
	dag "github.com/ipfs/go-merkledag"
Jeromy's avatar
Jeromy committed
29 30 31 32 33 34 35 36 37 38 39
)

// layerRepeat specifies how many times to append a child tree of a
// given depth. Higher values increase the width of a given node, which
// improves seek speeds.
const layerRepeat = 4

// Layout builds a new DAG with the trickle format using the provided
// DagBuilderHelper. See the module's description for a more detailed
// explanation.
func Layout(db *h.DagBuilderHelper) (ipld.Node, error) {
40 41 42
	newRoot := db.NewFSNodeOverDag(ft.TFile)
	root, _, err := fillTrickleRecFSNode(db, newRoot, -1)
	if err != nil {
Jeromy's avatar
Jeromy committed
43 44 45
		return nil, err
	}

46
	return root, db.Add(root)
Jeromy's avatar
Jeromy committed
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
}

// fillTrickleRec creates a trickle (sub-)tree with an optional maximum specified depth
// in the case maxDepth is greater than zero, or with unlimited depth otherwise
// (where the DAG builder will signal the end of data to end the function).
func fillTrickleRec(db *h.DagBuilderHelper, node *h.UnixfsNode, maxDepth int) error {
	// Always do this, even in the base case
	if err := db.FillNodeLayer(node); err != nil {
		return err
	}

	for depth := 1; ; depth++ {
		// Apply depth limit only if the parameter is set (> 0).
		if maxDepth > 0 && depth == maxDepth {
			return nil
		}
		for layer := 0; layer < layerRepeat; layer++ {
			if db.Done() {
				return nil
			}

			nextChild := db.NewUnixfsNode()
			if err := fillTrickleRec(db, nextChild, depth); err != nil {
				return err
			}

			if err := node.AddChild(nextChild, db); err != nil {
				return err
			}
		}
	}
}

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
// fillTrickleRecFSNode creates a trickle (sub-)tree with an optional maximum specified depth
// in the case maxDepth is greater than zero, or with unlimited depth otherwise
// (where the DAG builder will signal the end of data to end the function).
func fillTrickleRecFSNode(db *h.DagBuilderHelper, node *h.FSNodeOverDag, maxDepth int) (filledNode ipld.Node, nodeFileSize uint64, err error) {
	// Always do this, even in the base case
	if err := db.FillFSNodeLayer(node); err != nil {
		return nil, 0, err
	}

	for depth := 1; ; depth++ {
		// Apply depth limit only if the parameter is set (> 0).
		if db.Done() || (maxDepth > 0 && depth == maxDepth) {
			break
		}
		for layer := 0; layer < layerRepeat; layer++ {
			if db.Done() {
				break
			}

			nextChild := db.NewFSNodeOverDag(ft.TFile)
			childNode, childFileSize, err := fillTrickleRecFSNode(db, nextChild, depth)
			if err != nil {
				return nil, 0, err
			}

			if err := node.AddChild(childNode, childFileSize, db); err != nil {
				return nil, 0, err
			}
		}
	}
	nodeFileSize = node.FileSize()

	// Get the final `dag.ProtoNode` with the `FSNode` data encoded inside.
	filledNode, err = node.Commit()
	if err != nil {
		return nil, 0, err
	}

	return filledNode, nodeFileSize, nil
}

Jeromy's avatar
Jeromy committed
121 122 123 124 125 126 127 128
// Append appends the data in `db` to the dag, using the Trickledag format
func Append(ctx context.Context, basen ipld.Node, db *h.DagBuilderHelper) (out ipld.Node, errOut error) {
	base, ok := basen.(*dag.ProtoNode)
	if !ok {
		return nil, dag.ErrNotProtobuf
	}

	// Convert to unixfs node for working with easily
129 130

	fsn, err := h.NewFSNFromDag(base)
Jeromy's avatar
Jeromy committed
131 132 133 134 135
	if err != nil {
		return nil, err
	}

	// Get depth of this 'tree'
136
	n, layerProgress := trickleDepthInfoFSNode(fsn, db.Maxlinks())
Jeromy's avatar
Jeromy committed
137 138
	if n == 0 {
		// If direct blocks not filled...
139
		if err := db.FillFSNodeLayer(fsn); err != nil {
Jeromy's avatar
Jeromy committed
140 141 142 143
			return nil, err
		}

		if db.Done() {
144
			return fsn.GetDagNode()
Jeromy's avatar
Jeromy committed
145 146 147 148 149 150 151
		}

		// If continuing, our depth has increased by one
		n++
	}

	// Last child in this node may not be a full tree, lets file it up
152
	if err := appendFillLastChild(ctx, fsn, n-1, layerProgress, db); err != nil {
Jeromy's avatar
Jeromy committed
153 154 155 156 157 158 159 160 161 162 163
		return nil, err
	}

	// after appendFillLastChild, our depth is now increased by one
	if !db.Done() {
		n++
	}

	// Now, continue filling out tree like normal
	for i := n; !db.Done(); i++ {
		for j := 0; j < layerRepeat && !db.Done(); j++ {
164 165
			nextChild := db.NewFSNodeOverDag(ft.TFile)
			childNode, childFileSize, err := fillTrickleRecFSNode(db, nextChild, i)
Jeromy's avatar
Jeromy committed
166 167 168
			if err != nil {
				return nil, err
			}
169
			err = fsn.AddChild(childNode, childFileSize, db)
Jeromy's avatar
Jeromy committed
170 171 172 173 174
			if err != nil {
				return nil, err
			}
		}
	}
175 176 177 178 179
	_, err = fsn.Commit()
	if err != nil {
		return nil, err
	}
	return fsn.GetDagNode()
Jeromy's avatar
Jeromy committed
180 181
}

182 183
func appendFillLastChild(ctx context.Context, fsn *h.FSNodeOverDag, depth int, layerFill int, db *h.DagBuilderHelper) error {
	if fsn.NumChildren() <= db.Maxlinks() {
Jeromy's avatar
Jeromy committed
184 185 186
		return nil
	}
	// Recursive step, grab last child
187 188
	last := fsn.NumChildren() - 1
	lastChild, err := fsn.GetChild(ctx, last, db.GetDagServ())
Jeromy's avatar
Jeromy committed
189 190 191 192 193
	if err != nil {
		return err
	}

	// Fill out last child (may not be full tree)
194
	nchild, nchildSize, err := appendRec(ctx, lastChild, db, depth-1)
Jeromy's avatar
Jeromy committed
195 196 197 198 199
	if err != nil {
		return err
	}

	// Update changed child in parent node
200 201 202 203 204 205
	fsn.RemoveChild(last, db)
	filledNode, err := nchild.Commit()
	if err != nil {
		return err
	}
	err = fsn.AddChild(filledNode, nchildSize, db)
Jeromy's avatar
Jeromy committed
206 207 208 209 210 211 212
	if err != nil {
		return err
	}

	// Partially filled depth layer
	if layerFill != 0 {
		for ; layerFill < layerRepeat && !db.Done(); layerFill++ {
213 214
			nextChild := db.NewFSNodeOverDag(ft.TFile)
			childNode, childFileSize, err := fillTrickleRecFSNode(db, nextChild, depth)
Jeromy's avatar
Jeromy committed
215 216 217 218
			if err != nil {
				return err
			}

219
			if err := fsn.AddChild(childNode, childFileSize, db); err != nil {
Jeromy's avatar
Jeromy committed
220 221 222 223 224 225 226 227 228
				return err
			}
		}
	}

	return nil
}

// recursive call for Append
229
func appendRec(ctx context.Context, fsn *h.FSNodeOverDag, db *h.DagBuilderHelper, depth int) (*h.FSNodeOverDag, uint64, error) {
Jeromy's avatar
Jeromy committed
230
	if depth == 0 || db.Done() {
231
		return fsn, fsn.FileSize(), nil
Jeromy's avatar
Jeromy committed
232 233 234
	}

	// Get depth of this 'tree'
235
	n, layerProgress := trickleDepthInfoFSNode(fsn, db.Maxlinks())
Jeromy's avatar
Jeromy committed
236 237
	if n == 0 {
		// If direct blocks not filled...
238 239
		if err := db.FillFSNodeLayer(fsn); err != nil {
			return nil, 0, err
Jeromy's avatar
Jeromy committed
240 241 242 243 244 245
		}
		n++
	}

	// If at correct depth, no need to continue
	if n == depth {
246
		return fsn, fsn.FileSize(), nil
Jeromy's avatar
Jeromy committed
247 248
	}

249 250
	if err := appendFillLastChild(ctx, fsn, n, layerProgress, db); err != nil {
		return nil, 0, err
Jeromy's avatar
Jeromy committed
251 252 253 254 255 256 257 258 259 260
	}

	// after appendFillLastChild, our depth is now increased by one
	if !db.Done() {
		n++
	}

	// Now, continue filling out tree like normal
	for i := n; i < depth && !db.Done(); i++ {
		for j := 0; j < layerRepeat && !db.Done(); j++ {
261 262 263 264
			nextChild := db.NewFSNodeOverDag(ft.TFile)
			childNode, childFileSize, err := fillTrickleRecFSNode(db, nextChild, i)
			if err != nil {
				return nil, 0, err
Jeromy's avatar
Jeromy committed
265 266
			}

267 268
			if err := fsn.AddChild(childNode, childFileSize, db); err != nil {
				return nil, 0, err
Jeromy's avatar
Jeromy committed
269 270 271 272
			}
		}
	}

273
	return fsn, fsn.FileSize(), nil
Jeromy's avatar
Jeromy committed
274 275 276 277 278 279 280 281 282 283
}
func trickleDepthInfo(node *h.UnixfsNode, maxlinks int) (int, int) {
	n := node.NumChildren()
	if n < maxlinks {
		return 0, 0
	}

	return ((n - maxlinks) / layerRepeat) + 1, (n - maxlinks) % layerRepeat
}

284 285 286 287 288 289 290 291 292
func trickleDepthInfoFSNode(node *h.FSNodeOverDag, maxlinks int) (int, int) {
	n := node.NumChildren()
	if n < maxlinks {
		return 0, 0
	}

	return ((n - maxlinks) / layerRepeat) + 1, (n - maxlinks) % layerRepeat
}

Jeromy's avatar
Jeromy committed
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
// VerifyParams is used by VerifyTrickleDagStructure
type VerifyParams struct {
	Getter      ipld.NodeGetter
	Direct      int
	LayerRepeat int
	Prefix      *cid.Prefix
	RawLeaves   bool
}

// VerifyTrickleDagStructure checks that the given dag matches exactly the trickle dag datastructure
// layout
func VerifyTrickleDagStructure(nd ipld.Node, p VerifyParams) error {
	return verifyTDagRec(nd, -1, p)
}

// Recursive call for verifying the structure of a trickledag
func verifyTDagRec(n ipld.Node, depth int, p VerifyParams) error {
	codec := cid.DagProtobuf
	if depth == 0 {
		if len(n.Links()) > 0 {
			return errors.New("expected direct block")
		}
		// zero depth dag is raw data block
		switch nd := n.(type) {
		case *dag.ProtoNode:
Overbool's avatar
Overbool committed
318
			fsn, err := ft.FSNodeFromBytes(nd.Data())
Jeromy's avatar
Jeromy committed
319 320 321 322
			if err != nil {
				return err
			}

Overbool's avatar
Overbool committed
323
			if fsn.Type() != ft.TRaw {
Jeromy's avatar
Jeromy committed
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
				return errors.New("expected raw block")
			}

			if p.RawLeaves {
				return errors.New("expected raw leaf, got a protobuf node")
			}
		case *dag.RawNode:
			if !p.RawLeaves {
				return errors.New("expected protobuf node as leaf")
			}
			codec = cid.Raw
		default:
			return errors.New("expected ProtoNode or RawNode")
		}
	}

	// verify prefix
	if p.Prefix != nil {
		prefix := n.Cid().Prefix()
		expect := *p.Prefix // make a copy
		expect.Codec = uint64(codec)
		if codec == cid.Raw && expect.Version == 0 {
			expect.Version = 1
		}
		if expect.MhLength == -1 {
			expect.MhLength = prefix.MhLength
		}
		if prefix != expect {
			return fmt.Errorf("unexpected cid prefix: expected: %v; got %v", expect, prefix)
		}
	}

	if depth == 0 {
		return nil
	}

	nd, ok := n.(*dag.ProtoNode)
	if !ok {
		return errors.New("expected ProtoNode")
	}

	// Verify this is a branch node
Overbool's avatar
Overbool committed
366
	fsn, err := ft.FSNodeFromBytes(nd.Data())
Jeromy's avatar
Jeromy committed
367 368 369 370
	if err != nil {
		return err
	}

Overbool's avatar
Overbool committed
371 372
	if fsn.Type() != ft.TFile {
		return fmt.Errorf("expected file as branch node, got: %s", fsn.Type())
Jeromy's avatar
Jeromy committed
373 374
	}

Overbool's avatar
Overbool committed
375
	if len(fsn.Data()) > 0 {
Jeromy's avatar
Jeromy committed
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
		return errors.New("branch node should not have data")
	}

	for i := 0; i < len(nd.Links()); i++ {
		child, err := nd.Links()[i].GetNode(context.TODO(), p.Getter)
		if err != nil {
			return err
		}

		if i < p.Direct {
			// Direct blocks
			err := verifyTDagRec(child, 0, p)
			if err != nil {
				return err
			}
		} else {
			// Recursive trickle dags
			rdepth := ((i - p.Direct) / p.LayerRepeat) + 1
			if rdepth >= depth && depth > 0 {
				return errors.New("child dag was too deep")
			}
			err := verifyTDagRec(child, rdepth, p)
			if err != nil {
				return err
			}
		}
	}
	return nil
}