trickledag.go 9.05 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
// Package trickle allows to build trickle DAGs.
// In this type of DAG, non-leave nodes are first filled
// with data leaves, and then incorporate "layers" of subtrees
// as additional links.
//
// Each layer is a trickle sub-tree and is limited by an increasing
// maximum depth. Thus, the nodes first layer
// can only hold leaves (depth 1) but subsequent layers can grow deeper.
// By default, this module places 4 nodes per layer (that is, 4 subtrees
// of the same maximum depth before increasing it).
//
// Trickle DAGs are very good for sequentially reading data, as the
// first data leaves are directly reachable from the root and those
// coming next are always nearby. They are
// suited for things like streaming applications.
package trickle

import (
	"context"
	"errors"
	"fmt"

Jeromy's avatar
Jeromy committed
23 24
	ft "github.com/ipfs/go-unixfs"
	h "github.com/ipfs/go-unixfs/importer/helpers"
Jeromy's avatar
Jeromy committed
25

Jeromy's avatar
Jeromy committed
26 27 28
	cid "github.com/ipfs/go-cid"
	ipld "github.com/ipfs/go-ipld-format"
	dag "github.com/ipfs/go-merkledag"
Jeromy's avatar
Jeromy committed
29 30 31 32 33 34 35 36 37 38 39
)

// layerRepeat specifies how many times to append a child tree of a
// given depth. Higher values increase the width of a given node, which
// improves seek speeds.
const layerRepeat = 4

// Layout builds a new DAG with the trickle format using the provided
// DagBuilderHelper. See the module's description for a more detailed
// explanation.
func Layout(db *h.DagBuilderHelper) (ipld.Node, error) {
40
	newRoot := db.NewFSNodeOverDag(ft.TFile)
Bamvor Zhang's avatar
Bamvor Zhang committed
41
	root, _, err := fillTrickleRec(db, newRoot, -1)
42
	if err != nil {
Jeromy's avatar
Jeromy committed
43 44 45
		return nil, err
	}

46
	return root, db.Add(root)
Jeromy's avatar
Jeromy committed
47 48 49 50 51
}

// fillTrickleRec creates a trickle (sub-)tree with an optional maximum specified depth
// in the case maxDepth is greater than zero, or with unlimited depth otherwise
// (where the DAG builder will signal the end of data to end the function).
Bamvor Zhang's avatar
Bamvor Zhang committed
52
func fillTrickleRec(db *h.DagBuilderHelper, node *h.FSNodeOverDag, maxDepth int) (filledNode ipld.Node, nodeFileSize uint64, err error) {
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
	// Always do this, even in the base case
	if err := db.FillFSNodeLayer(node); err != nil {
		return nil, 0, err
	}

	for depth := 1; ; depth++ {
		// Apply depth limit only if the parameter is set (> 0).
		if db.Done() || (maxDepth > 0 && depth == maxDepth) {
			break
		}
		for layer := 0; layer < layerRepeat; layer++ {
			if db.Done() {
				break
			}

			nextChild := db.NewFSNodeOverDag(ft.TFile)
Bamvor Zhang's avatar
Bamvor Zhang committed
69
			childNode, childFileSize, err := fillTrickleRec(db, nextChild, depth)
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
			if err != nil {
				return nil, 0, err
			}

			if err := node.AddChild(childNode, childFileSize, db); err != nil {
				return nil, 0, err
			}
		}
	}
	nodeFileSize = node.FileSize()

	// Get the final `dag.ProtoNode` with the `FSNode` data encoded inside.
	filledNode, err = node.Commit()
	if err != nil {
		return nil, 0, err
	}

	return filledNode, nodeFileSize, nil
}

Jeromy's avatar
Jeromy committed
90 91 92 93 94 95 96 97
// Append appends the data in `db` to the dag, using the Trickledag format
func Append(ctx context.Context, basen ipld.Node, db *h.DagBuilderHelper) (out ipld.Node, errOut error) {
	base, ok := basen.(*dag.ProtoNode)
	if !ok {
		return nil, dag.ErrNotProtobuf
	}

	// Convert to unixfs node for working with easily
98 99

	fsn, err := h.NewFSNFromDag(base)
Jeromy's avatar
Jeromy committed
100 101 102 103 104
	if err != nil {
		return nil, err
	}

	// Get depth of this 'tree'
Bamvor Zhang's avatar
Bamvor Zhang committed
105
	n, layerProgress := trickleDepthInfo(fsn, db.Maxlinks())
Jeromy's avatar
Jeromy committed
106 107
	if n == 0 {
		// If direct blocks not filled...
108
		if err := db.FillFSNodeLayer(fsn); err != nil {
Jeromy's avatar
Jeromy committed
109 110 111 112
			return nil, err
		}

		if db.Done() {
113
			return fsn.GetDagNode()
Jeromy's avatar
Jeromy committed
114 115 116 117 118 119 120
		}

		// If continuing, our depth has increased by one
		n++
	}

	// Last child in this node may not be a full tree, lets file it up
121
	if err := appendFillLastChild(ctx, fsn, n-1, layerProgress, db); err != nil {
Jeromy's avatar
Jeromy committed
122 123 124 125 126 127 128 129 130 131 132
		return nil, err
	}

	// after appendFillLastChild, our depth is now increased by one
	if !db.Done() {
		n++
	}

	// Now, continue filling out tree like normal
	for i := n; !db.Done(); i++ {
		for j := 0; j < layerRepeat && !db.Done(); j++ {
133
			nextChild := db.NewFSNodeOverDag(ft.TFile)
Bamvor Zhang's avatar
Bamvor Zhang committed
134
			childNode, childFileSize, err := fillTrickleRec(db, nextChild, i)
Jeromy's avatar
Jeromy committed
135 136 137
			if err != nil {
				return nil, err
			}
138
			err = fsn.AddChild(childNode, childFileSize, db)
Jeromy's avatar
Jeromy committed
139 140 141 142 143
			if err != nil {
				return nil, err
			}
		}
	}
144 145 146 147 148
	_, err = fsn.Commit()
	if err != nil {
		return nil, err
	}
	return fsn.GetDagNode()
Jeromy's avatar
Jeromy committed
149 150
}

151 152
func appendFillLastChild(ctx context.Context, fsn *h.FSNodeOverDag, depth int, layerFill int, db *h.DagBuilderHelper) error {
	if fsn.NumChildren() <= db.Maxlinks() {
Jeromy's avatar
Jeromy committed
153 154 155
		return nil
	}
	// Recursive step, grab last child
156 157
	last := fsn.NumChildren() - 1
	lastChild, err := fsn.GetChild(ctx, last, db.GetDagServ())
Jeromy's avatar
Jeromy committed
158 159 160 161 162
	if err != nil {
		return err
	}

	// Fill out last child (may not be full tree)
163
	nchild, nchildSize, err := appendRec(ctx, lastChild, db, depth-1)
Jeromy's avatar
Jeromy committed
164 165 166 167 168
	if err != nil {
		return err
	}

	// Update changed child in parent node
169 170 171 172 173 174
	fsn.RemoveChild(last, db)
	filledNode, err := nchild.Commit()
	if err != nil {
		return err
	}
	err = fsn.AddChild(filledNode, nchildSize, db)
Jeromy's avatar
Jeromy committed
175 176 177 178 179 180 181
	if err != nil {
		return err
	}

	// Partially filled depth layer
	if layerFill != 0 {
		for ; layerFill < layerRepeat && !db.Done(); layerFill++ {
182
			nextChild := db.NewFSNodeOverDag(ft.TFile)
Bamvor Zhang's avatar
Bamvor Zhang committed
183
			childNode, childFileSize, err := fillTrickleRec(db, nextChild, depth)
Jeromy's avatar
Jeromy committed
184 185 186 187
			if err != nil {
				return err
			}

188
			if err := fsn.AddChild(childNode, childFileSize, db); err != nil {
Jeromy's avatar
Jeromy committed
189 190 191 192 193 194 195 196 197
				return err
			}
		}
	}

	return nil
}

// recursive call for Append
198
func appendRec(ctx context.Context, fsn *h.FSNodeOverDag, db *h.DagBuilderHelper, depth int) (*h.FSNodeOverDag, uint64, error) {
Jeromy's avatar
Jeromy committed
199
	if depth == 0 || db.Done() {
200
		return fsn, fsn.FileSize(), nil
Jeromy's avatar
Jeromy committed
201 202 203
	}

	// Get depth of this 'tree'
Bamvor Zhang's avatar
Bamvor Zhang committed
204
	n, layerProgress := trickleDepthInfo(fsn, db.Maxlinks())
Jeromy's avatar
Jeromy committed
205 206
	if n == 0 {
		// If direct blocks not filled...
207 208
		if err := db.FillFSNodeLayer(fsn); err != nil {
			return nil, 0, err
Jeromy's avatar
Jeromy committed
209 210 211 212 213 214
		}
		n++
	}

	// If at correct depth, no need to continue
	if n == depth {
215
		return fsn, fsn.FileSize(), nil
Jeromy's avatar
Jeromy committed
216 217
	}

218 219
	if err := appendFillLastChild(ctx, fsn, n, layerProgress, db); err != nil {
		return nil, 0, err
Jeromy's avatar
Jeromy committed
220 221 222 223 224 225 226 227 228 229
	}

	// after appendFillLastChild, our depth is now increased by one
	if !db.Done() {
		n++
	}

	// Now, continue filling out tree like normal
	for i := n; i < depth && !db.Done(); i++ {
		for j := 0; j < layerRepeat && !db.Done(); j++ {
230
			nextChild := db.NewFSNodeOverDag(ft.TFile)
Bamvor Zhang's avatar
Bamvor Zhang committed
231
			childNode, childFileSize, err := fillTrickleRec(db, nextChild, i)
232 233
			if err != nil {
				return nil, 0, err
Jeromy's avatar
Jeromy committed
234 235
			}

236 237
			if err := fsn.AddChild(childNode, childFileSize, db); err != nil {
				return nil, 0, err
Jeromy's avatar
Jeromy committed
238 239 240 241
			}
		}
	}

242
	return fsn, fsn.FileSize(), nil
Jeromy's avatar
Jeromy committed
243 244
}

Bamvor Zhang's avatar
Bamvor Zhang committed
245
func trickleDepthInfo(node *h.FSNodeOverDag, maxlinks int) (int, int) {
246 247 248 249 250 251 252 253
	n := node.NumChildren()
	if n < maxlinks {
		return 0, 0
	}

	return ((n - maxlinks) / layerRepeat) + 1, (n - maxlinks) % layerRepeat
}

Jeromy's avatar
Jeromy committed
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
// VerifyParams is used by VerifyTrickleDagStructure
type VerifyParams struct {
	Getter      ipld.NodeGetter
	Direct      int
	LayerRepeat int
	Prefix      *cid.Prefix
	RawLeaves   bool
}

// VerifyTrickleDagStructure checks that the given dag matches exactly the trickle dag datastructure
// layout
func VerifyTrickleDagStructure(nd ipld.Node, p VerifyParams) error {
	return verifyTDagRec(nd, -1, p)
}

// Recursive call for verifying the structure of a trickledag
func verifyTDagRec(n ipld.Node, depth int, p VerifyParams) error {
	codec := cid.DagProtobuf
	if depth == 0 {
		if len(n.Links()) > 0 {
			return errors.New("expected direct block")
		}
		// zero depth dag is raw data block
		switch nd := n.(type) {
		case *dag.ProtoNode:
Overbool's avatar
Overbool committed
279
			fsn, err := ft.FSNodeFromBytes(nd.Data())
Jeromy's avatar
Jeromy committed
280 281 282 283
			if err != nil {
				return err
			}

Overbool's avatar
Overbool committed
284
			if fsn.Type() != ft.TRaw {
Jeromy's avatar
Jeromy committed
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
				return errors.New("expected raw block")
			}

			if p.RawLeaves {
				return errors.New("expected raw leaf, got a protobuf node")
			}
		case *dag.RawNode:
			if !p.RawLeaves {
				return errors.New("expected protobuf node as leaf")
			}
			codec = cid.Raw
		default:
			return errors.New("expected ProtoNode or RawNode")
		}
	}

	// verify prefix
	if p.Prefix != nil {
		prefix := n.Cid().Prefix()
		expect := *p.Prefix // make a copy
		expect.Codec = uint64(codec)
		if codec == cid.Raw && expect.Version == 0 {
			expect.Version = 1
		}
		if expect.MhLength == -1 {
			expect.MhLength = prefix.MhLength
		}
		if prefix != expect {
			return fmt.Errorf("unexpected cid prefix: expected: %v; got %v", expect, prefix)
		}
	}

	if depth == 0 {
		return nil
	}

	nd, ok := n.(*dag.ProtoNode)
	if !ok {
		return errors.New("expected ProtoNode")
	}

	// Verify this is a branch node
Overbool's avatar
Overbool committed
327
	fsn, err := ft.FSNodeFromBytes(nd.Data())
Jeromy's avatar
Jeromy committed
328 329 330 331
	if err != nil {
		return err
	}

Overbool's avatar
Overbool committed
332 333
	if fsn.Type() != ft.TFile {
		return fmt.Errorf("expected file as branch node, got: %s", fsn.Type())
Jeromy's avatar
Jeromy committed
334 335
	}

Overbool's avatar
Overbool committed
336
	if len(fsn.Data()) > 0 {
Jeromy's avatar
Jeromy committed
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
		return errors.New("branch node should not have data")
	}

	for i := 0; i < len(nd.Links()); i++ {
		child, err := nd.Links()[i].GetNode(context.TODO(), p.Getter)
		if err != nil {
			return err
		}

		if i < p.Direct {
			// Direct blocks
			err := verifyTDagRec(child, 0, p)
			if err != nil {
				return err
			}
		} else {
			// Recursive trickle dags
			rdepth := ((i - p.Direct) / p.LayerRepeat) + 1
			if rdepth >= depth && depth > 0 {
				return errors.New("child dag was too deep")
			}
			err := verifyTDagRec(child, rdepth, p)
			if err != nil {
				return err
			}
		}
	}
	return nil
}