merkledag.go 15.1 KB
Newer Older
1
// Package merkledag implements the IPFS Merkle DAG data structures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

Jeromy's avatar
Jeromy committed
9 10 11 12 13
	blocks "github.com/ipfs/go-block-format"
	bserv "github.com/ipfs/go-blockservice"
	cid "github.com/ipfs/go-cid"
	ipldcbor "github.com/ipfs/go-ipld-cbor"
	ipld "github.com/ipfs/go-ipld-format"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14 15
)

16 17 18 19
// TODO: We should move these registrations elsewhere. Really, most of the IPLD
// functionality should go in a `go-ipld` repo but that will take a lot of work
// and design.
func init() {
20 21 22
	ipld.Register(cid.DagProtobuf, DecodeProtobufBlock)
	ipld.Register(cid.Raw, DecodeRawBlock)
	ipld.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
23 24
}

25 26 27 28 29
// contextKey is a type to use as value for the ProgressTracker contexts.
type contextKey string

const progressContextKey contextKey = "progress"

30
// NewDAGService constructs a new DAGService (using the default implementation).
31
// Note that the default implementation is also an ipld.LinkGetter.
32
func NewDAGService(bs bserv.BlockService) *dagService {
33
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
34 35
}

36
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
37 38
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
39 40
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
41
type dagService struct {
42
	Blocks bserv.BlockService
43 44
}

45
// Add adds a node to the dagService, storing the block in the BlockService
46
func (n *dagService) Add(ctx context.Context, nd ipld.Node) error {
47
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
48
		return fmt.Errorf("dagService is nil")
49 50
	}

51
	return n.Blocks.AddBlock(nd)
52 53
}

54
func (n *dagService) AddMany(ctx context.Context, nds []ipld.Node) error {
55 56 57
	blks := make([]blocks.Block, len(nds))
	for i, nd := range nds {
		blks[i] = nd
58
	}
59
	return n.Blocks.AddBlocks(blks)
60 61
}

62
// Get retrieves a node from the dagService, fetching the block in the BlockService
63
func (n *dagService) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) {
64
	if n == nil {
65
		return nil, fmt.Errorf("dagService is nil")
66
	}
Jeromy's avatar
Jeromy committed
67

68 69
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
70

Jeromy's avatar
Jeromy committed
71
	b, err := n.Blocks.GetBlock(ctx, c)
72
	if err != nil {
73
		if err == bserv.ErrNotFound {
74
			return nil, ipld.ErrNotFound
75
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
76
		return nil, fmt.Errorf("failed to get block for %s: %v", c, err)
77 78
	}

79
	return ipld.Decode(b)
80
}
Jeromy's avatar
Jeromy committed
81

82 83
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
84
func (n *dagService) GetLinks(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
85 86 87
	if c.Type() == cid.Raw {
		return nil, nil
	}
88 89 90 91
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
92
	return node.Links(), nil
93 94
}

95
func (n *dagService) Remove(ctx context.Context, c cid.Cid) error {
96
	return n.Blocks.DeleteBlock(c)
97 98
}

99 100 101 102 103
// RemoveMany removes multiple nodes from the DAG. It will likely be faster than
// removing them individually.
//
// This operation is not atomic. If it returns an error, some nodes may or may
// not have been removed.
104
func (n *dagService) RemoveMany(ctx context.Context, cids []cid.Cid) error {
105 106 107 108 109 110 111
	// TODO(#4608): make this batch all the way down.
	for _, c := range cids {
		if err := n.Blocks.DeleteBlock(c); err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
112 113
}

114 115 116
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
117
func GetLinksDirect(serv ipld.NodeGetter) GetLinks {
118
	return func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
119
		nd, err := serv.Get(ctx, c)
120
		if err != nil {
Jeromy's avatar
Jeromy committed
121
			if err == bserv.ErrNotFound {
122
				err = ipld.ErrNotFound
Jeromy's avatar
Jeromy committed
123
			}
124 125
			return nil, err
		}
126
		return nd.Links(), nil
127 128 129
	}
}

130 131 132 133
type sesGetter struct {
	bs *bserv.Session
}

134
// Get gets a single node from the DAG.
135
func (sg *sesGetter) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) {
136
	blk, err := sg.bs.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
137 138
	switch err {
	case bserv.ErrNotFound:
139
		return nil, ipld.ErrNotFound
Jeromy's avatar
Jeromy committed
140
	default:
141
		return nil, err
Jeromy's avatar
Jeromy committed
142 143
	case nil:
		// noop
144 145
	}

146
	return ipld.Decode(blk)
147 148
}

149
// GetMany gets many nodes at once, batching the request if possible.
150
func (sg *sesGetter) GetMany(ctx context.Context, keys []cid.Cid) <-chan *ipld.NodeOption {
151 152 153
	return getNodesFromBG(ctx, sg.bs, keys)
}

Jeromy's avatar
Jeromy committed
154
// Session returns a NodeGetter using a new session for block fetches.
155 156
func (n *dagService) Session(ctx context.Context) ipld.NodeGetter {
	return &sesGetter{bserv.NewSession(ctx, n.Blocks)}
Jeromy's avatar
Jeromy committed
157 158
}

159
// FetchGraph fetches all nodes that are children of the given node
160
func FetchGraph(ctx context.Context, root cid.Cid, serv ipld.DAGService) error {
161 162 163 164
	return FetchGraphWithDepthLimit(ctx, root, -1, serv)
}

// FetchGraphWithDepthLimit fetches all nodes that are children to the given
165
// node down to the given depth. maxDepth=0 means "only fetch root",
166 167
// maxDepth=1 means "fetch root and its direct children" and so on...
// maxDepth=-1 means unlimited.
168
func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, serv ipld.DAGService) error {
169
	var ng ipld.NodeGetter = serv
170 171
	ds, ok := serv.(*dagService)
	if ok {
172
		ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
173 174
	}

Steven Allen's avatar
Steven Allen committed
175
	set := make(map[cid.Cid]int)
176 177 178 179 180 181 182 183

	// Visit function returns true when:
	// * The element is not in the set and we're not over depthLim
	// * The element is in the set but recorded depth is deeper
	//   than currently seen (if we find it higher in the tree we'll need
	//   to explore deeper than before).
	// depthLim = -1 means we only return true if the element is not in the
	// set.
184
	visit := func(c cid.Cid, depth int) bool {
Steven Allen's avatar
Steven Allen committed
185
		oldDepth, ok := set[c]
186 187 188 189 190 191

		if (ok && depthLim < 0) || (depthLim >= 0 && depth > depthLim) {
			return false
		}

		if !ok || oldDepth > depth {
Steven Allen's avatar
Steven Allen committed
192
			set[c] = depth
193 194 195 196 197
			return true
		}
		return false
	}

198
	// If we have a ProgressTracker, we wrap the visit function to handle it
199
	v, _ := ctx.Value(progressContextKey).(*ProgressTracker)
200
	if v == nil {
201
		return WalkDepth(ctx, GetLinksDirect(ng), root, visit, Concurrent(), WithRoot())
202
	}
203

204
	visitProgress := func(c cid.Cid, depth int) bool {
205
		if visit(c, depth) {
206 207 208
			v.Increment()
			return true
		}
209
		return false
210
	}
211
	return WalkDepth(ctx, GetLinksDirect(ng), root, visitProgress, Concurrent(), WithRoot())
Jeromy's avatar
Jeromy committed
212
}
213

214 215 216 217 218
// GetMany gets many nodes from the DAG at once.
//
// This method may not return all requested nodes (and may or may not return an
// error indicating that it failed to do so. It is up to the caller to verify
// that it received all nodes.
219
func (n *dagService) GetMany(ctx context.Context, keys []cid.Cid) <-chan *ipld.NodeOption {
220
	return getNodesFromBG(ctx, n.Blocks, keys)
221 222
}

223
func dedupKeys(keys []cid.Cid) []cid.Cid {
Steven Allen's avatar
Steven Allen committed
224 225 226 227 228 229 230 231 232 233
	set := cid.NewSet()
	for _, c := range keys {
		set.Add(c)
	}
	if set.Len() == len(keys) {
		return keys
	}
	return set.Keys()
}

234
func getNodesFromBG(ctx context.Context, bs bserv.BlockGetter, keys []cid.Cid) <-chan *ipld.NodeOption {
Steven Allen's avatar
Steven Allen committed
235 236
	keys = dedupKeys(keys)

237
	out := make(chan *ipld.NodeOption, len(keys))
238
	blocks := bs.GetBlocks(ctx, keys)
239 240
	var count int

241 242 243 244 245 246
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
247
					if count != len(keys) {
248
						out <- &ipld.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
249
					}
250 251
					return
				}
Jeromy's avatar
Jeromy committed
252

253
				nd, err := ipld.Decode(b)
254
				if err != nil {
255
					out <- &ipld.NodeOption{Err: err}
256 257
					return
				}
Jeromy's avatar
Jeromy committed
258

259
				out <- &ipld.NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
260 261
				count++

262
			case <-ctx.Done():
263
				out <- &ipld.NodeOption{Err: ctx.Err()}
Jeromy's avatar
Jeromy committed
264
				return
265 266 267
			}
		}
	}()
268
	return out
269 270
}

271 272
// GetLinks is the type of function passed to the EnumerateChildren function(s)
// for getting the children of an IPLD node.
273
type GetLinks func(context.Context, cid.Cid) ([]*ipld.Link, error)
Jeromy's avatar
Jeromy committed
274

275 276 277 278
// GetLinksWithDAG returns a GetLinks function that tries to use the given
// NodeGetter as a LinkGetter to get the children of a given IPLD node. This may
// allow us to traverse the DAG without actually loading and parsing the node in
// question (if we already have the links cached).
279
func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks {
280
	return func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
281
		return ipld.GetLinks(ctx, ng, c)
Jeromy's avatar
Jeromy committed
282
	}
283
}
284

285 286 287 288
// defaultConcurrentFetch is the default maximum number of concurrent fetches
// that 'fetchNodes' will start at a time
const defaultConcurrentFetch = 32

289 290
// walkOptions represent the parameters of a graph walking algorithm
type walkOptions struct {
291 292
	WithRoot     bool
	Concurrency  int
293
	ErrorHandler func(c cid.Cid, err error) error
294 295
}

296 297 298 299 300 301 302 303 304 305 306 307
// WalkOption is a setter for walkOptions
type WalkOption func(*walkOptions)

func (wo *walkOptions) addHandler(handler func(c cid.Cid, err error) error) {
	if wo.ErrorHandler != nil {
		wo.ErrorHandler = func(c cid.Cid, err error) error {
			return handler(c, wo.ErrorHandler(c, err))
		}
	} else {
		wo.ErrorHandler = handler
	}
}
308 309 310

// WithRoot is a WalkOption indicating that the root node should be visited
func WithRoot() WalkOption {
311
	return func(walkOptions *walkOptions) {
312 313 314 315 316 317 318 319 320
		walkOptions.WithRoot = true
	}
}

// Concurrent is a WalkOption indicating that node fetching should be done in
// parallel, with the default concurrency factor.
// NOTE: When using that option, the walk order is *not* guarantee.
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func Concurrent() WalkOption {
321
	return func(walkOptions *walkOptions) {
322 323 324 325 326 327 328 329 330
		walkOptions.Concurrency = defaultConcurrentFetch
	}
}

// Concurrency is a WalkOption indicating that node fetching should be done in
// parallel, with a specific concurrency factor.
// NOTE: When using that option, the walk order is *not* guarantee.
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func Concurrency(worker int) WalkOption {
331
	return func(walkOptions *walkOptions) {
332 333 334 335
		walkOptions.Concurrency = worker
	}
}

336 337 338
// IgnoreErrors is a WalkOption indicating that the walk should attempt to
// continue even when an error occur.
func IgnoreErrors() WalkOption {
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
	return func(walkOptions *walkOptions) {
		walkOptions.addHandler(func(c cid.Cid, err error) error {
			return nil
		})
	}
}

// IgnoreMissing is a WalkOption indicating that the walk should continue when
// a node is missing.
func IgnoreMissing() WalkOption {
	return func(walkOptions *walkOptions) {
		walkOptions.addHandler(func(c cid.Cid, err error) error {
			if err == ipld.ErrNotFound {
				return nil
			}
			return err
		})
	}
}

// OnMissing is a WalkOption adding a callback that will be triggered on a missing
// node.
func OnMissing(callback func(c cid.Cid)) WalkOption {
	return func(walkOptions *walkOptions) {
		walkOptions.addHandler(func(c cid.Cid, err error) error {
			if err == ipld.ErrNotFound {
				callback(c)
			}
			return err
		})
	}
}

// OnError is a WalkOption adding a custom error handler.
// If this handler return a nil error, the walk will continue.
func OnError(handler func(c cid.Cid, err error) error) WalkOption {
	return func(walkOptions *walkOptions) {
		walkOptions.addHandler(handler)
377 378 379
	}
}

380
// WalkGraph will walk the dag in order (depth first) starting at the given root.
381
func Walk(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) bool, options ...WalkOption) error {
382
	visitDepth := func(c cid.Cid, depth int) bool {
383 384 385
		return visit(c)
	}

386
	return WalkDepth(ctx, getLinks, c, visitDepth, options...)
387 388
}

389 390 391
// WalkDepth walks the dag starting at the given root and passes the current
// depth to a given visit function. The visit function can be used to limit DAG
// exploration.
392
func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid, int) bool, options ...WalkOption) error {
393
	opts := &walkOptions{}
394 395 396 397 398 399 400 401 402 403 404
	for _, opt := range options {
		opt(opts)
	}

	if opts.Concurrency > 1 {
		return parallelWalkDepth(ctx, getLinks, c, visit, opts)
	} else {
		return sequentialWalkDepth(ctx, getLinks, c, 0, visit, opts)
	}
}

405
func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool, options *walkOptions) error {
406 407 408 409
	if depth != 0 || options.WithRoot {
		if !visit(root, depth) {
			return nil
		}
410 411
	}

412
	links, err := getLinks(ctx, root)
413 414 415 416
	if err != nil && options.ErrorHandler != nil {
		err = options.ErrorHandler(root, err)
	}
	if err != nil {
417 418
		return err
	}
419

420
	for _, lnk := range links {
421
		if err := sequentialWalkDepth(ctx, getLinks, lnk.Cid, depth+1, visit, options); err != nil {
422
			return err
423 424 425 426
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
427

428
// ProgressTracker is used to show progress when fetching nodes.
429 430 431 432 433
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

434 435
// DeriveContext returns a new context with value "progress" derived from
// the given one.
436
func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
437
	return context.WithValue(ctx, progressContextKey, p)
438 439
}

440
// Increment adds one to the total progress.
441 442 443 444 445 446
func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

447
// Value returns the current progress.
448 449 450 451 452 453
func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

454
func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *walkOptions) error {
455
	type cidDepth struct {
456
		cid   cid.Cid
457 458 459 460 461 462 463 464 465 466
		depth int
	}

	type linksDepth struct {
		links []*ipld.Link
		depth int
	}

	feed := make(chan *cidDepth)
	out := make(chan *linksDepth)
467 468
	done := make(chan struct{})

469
	var visitlk sync.Mutex
470
	var wg sync.WaitGroup
471

472 473
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
474
	defer wg.Wait()
475
	defer cancel()
476
	for i := 0; i < options.Concurrency; i++ {
477
		wg.Add(1)
478
		go func() {
479
			defer wg.Done()
480 481 482 483
			for cdepth := range feed {
				ci := cdepth.cid
				depth := cdepth.depth

484 485 486 487 488 489 490 491 492 493
				var shouldVisit bool

				// bypass the root if needed
				if depth != 0 || options.WithRoot {
					visitlk.Lock()
					shouldVisit = visit(ci, depth)
					visitlk.Unlock()
				} else {
					shouldVisit = true
				}
494

495
				if shouldVisit {
496
					links, err := getLinks(ctx, ci)
497 498 499 500
					if err != nil && options.ErrorHandler != nil {
						err = options.ErrorHandler(root, err)
					}
					if err != nil {
Steven Allen's avatar
Steven Allen committed
501 502 503 504
						select {
						case errChan <- err:
						case <-fetchersCtx.Done():
						}
505 506 507
						return
					}

508 509 510 511 512
					outLinks := &linksDepth{
						links: links,
						depth: depth + 1,
					}

513
					select {
514
					case out <- outLinks:
515
					case <-fetchersCtx.Done():
516 517 518
						return
					}
				}
Jeromy's avatar
Jeromy committed
519
				select {
520
				case done <- struct{}{}:
521
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
522 523
				}
			}
524
		}()
Jeromy's avatar
Jeromy committed
525
	}
526
	defer close(feed)
Jeromy's avatar
Jeromy committed
527

528
	send := feed
529
	var todoQueue []*cidDepth
530
	var inProgress int
Jeromy's avatar
Jeromy committed
531

532
	next := &cidDepth{
533 534
		cid:   root,
		depth: 0,
535
	}
536

537 538 539 540
	for {
		select {
		case send <- next:
			inProgress++
541 542 543
			if len(todoQueue) > 0 {
				next = todoQueue[0]
				todoQueue = todoQueue[1:]
544 545 546 547 548 549 550 551 552
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
553 554 555 556 557 558 559
		case linksDepth := <-out:
			for _, lnk := range linksDepth.links {
				cd := &cidDepth{
					cid:   lnk.Cid,
					depth: linksDepth.depth,
				}

560
				if next == nil {
561
					next = cd
562 563
					send = feed
				} else {
564
					todoQueue = append(todoQueue, cd)
565
				}
Jeromy's avatar
Jeromy committed
566
			}
567 568
		case err := <-errChan:
			return err
569

570
		case <-ctx.Done():
571
			return ctx.Err()
572
		}
Jeromy's avatar
Jeromy committed
573 574
	}
}
575

576 577 578 579
var _ ipld.LinkGetter = &dagService{}
var _ ipld.NodeGetter = &dagService{}
var _ ipld.NodeGetter = &sesGetter{}
var _ ipld.DAGService = &dagService{}