merkledag.go 13.7 KB
Newer Older
1
// Package merkledag implements the IPFS Merkle DAG data structures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

Jeromy's avatar
Jeromy committed
9 10 11 12 13
	blocks "github.com/ipfs/go-block-format"
	bserv "github.com/ipfs/go-blockservice"
	cid "github.com/ipfs/go-cid"
	ipldcbor "github.com/ipfs/go-ipld-cbor"
	ipld "github.com/ipfs/go-ipld-format"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14 15
)

16 17 18 19
// TODO: We should move these registrations elsewhere. Really, most of the IPLD
// functionality should go in a `go-ipld` repo but that will take a lot of work
// and design.
func init() {
20 21 22
	ipld.Register(cid.DagProtobuf, DecodeProtobufBlock)
	ipld.Register(cid.Raw, DecodeRawBlock)
	ipld.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
23 24
}

25 26 27 28 29
// contextKey is a type to use as value for the ProgressTracker contexts.
type contextKey string

const progressContextKey contextKey = "progress"

30
// NewDAGService constructs a new DAGService (using the default implementation).
31
// Note that the default implementation is also an ipld.LinkGetter.
32
func NewDAGService(bs bserv.BlockService) *dagService {
33
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
34 35
}

36
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
37 38
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
39 40
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
41
type dagService struct {
42
	Blocks bserv.BlockService
43 44
}

45
// Add adds a node to the dagService, storing the block in the BlockService
46
func (n *dagService) Add(ctx context.Context, nd ipld.Node) error {
47
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
48
		return fmt.Errorf("dagService is nil")
49 50
	}

51
	return n.Blocks.AddBlock(nd)
52 53
}

54
func (n *dagService) AddMany(ctx context.Context, nds []ipld.Node) error {
55 56 57
	blks := make([]blocks.Block, len(nds))
	for i, nd := range nds {
		blks[i] = nd
58
	}
59
	return n.Blocks.AddBlocks(blks)
60 61
}

62
// Get retrieves a node from the dagService, fetching the block in the BlockService
63
func (n *dagService) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) {
64
	if n == nil {
65
		return nil, fmt.Errorf("dagService is nil")
66
	}
Jeromy's avatar
Jeromy committed
67

68 69
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
70

Jeromy's avatar
Jeromy committed
71
	b, err := n.Blocks.GetBlock(ctx, c)
72
	if err != nil {
73
		if err == bserv.ErrNotFound {
74
			return nil, ipld.ErrNotFound
75
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
76
		return nil, fmt.Errorf("failed to get block for %s: %v", c, err)
77 78
	}

79
	return ipld.Decode(b)
80
}
Jeromy's avatar
Jeromy committed
81

82 83
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
84
func (n *dagService) GetLinks(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
85 86 87
	if c.Type() == cid.Raw {
		return nil, nil
	}
88 89 90 91
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
92
	return node.Links(), nil
93 94
}

95
func (n *dagService) Remove(ctx context.Context, c cid.Cid) error {
96
	return n.Blocks.DeleteBlock(c)
97 98
}

99 100 101 102 103
// RemoveMany removes multiple nodes from the DAG. It will likely be faster than
// removing them individually.
//
// This operation is not atomic. If it returns an error, some nodes may or may
// not have been removed.
104
func (n *dagService) RemoveMany(ctx context.Context, cids []cid.Cid) error {
105 106 107 108 109 110 111
	// TODO(#4608): make this batch all the way down.
	for _, c := range cids {
		if err := n.Blocks.DeleteBlock(c); err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
112 113
}

114 115 116
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
117
func GetLinksDirect(serv ipld.NodeGetter) GetLinks {
118
	return func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
119
		nd, err := serv.Get(ctx, c)
120
		if err != nil {
Jeromy's avatar
Jeromy committed
121
			if err == bserv.ErrNotFound {
122
				err = ipld.ErrNotFound
Jeromy's avatar
Jeromy committed
123
			}
124 125
			return nil, err
		}
126
		return nd.Links(), nil
127 128 129
	}
}

130 131 132 133
type sesGetter struct {
	bs *bserv.Session
}

134
// Get gets a single node from the DAG.
135
func (sg *sesGetter) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) {
136
	blk, err := sg.bs.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
137 138
	switch err {
	case bserv.ErrNotFound:
139
		return nil, ipld.ErrNotFound
Jeromy's avatar
Jeromy committed
140
	default:
141
		return nil, err
Jeromy's avatar
Jeromy committed
142 143
	case nil:
		// noop
144 145
	}

146
	return ipld.Decode(blk)
147 148
}

149
// GetMany gets many nodes at once, batching the request if possible.
150
func (sg *sesGetter) GetMany(ctx context.Context, keys []cid.Cid) <-chan *ipld.NodeOption {
151 152 153
	return getNodesFromBG(ctx, sg.bs, keys)
}

Jeromy's avatar
Jeromy committed
154
// Session returns a NodeGetter using a new session for block fetches.
155 156
func (n *dagService) Session(ctx context.Context) ipld.NodeGetter {
	return &sesGetter{bserv.NewSession(ctx, n.Blocks)}
Jeromy's avatar
Jeromy committed
157 158
}

159
// FetchGraph fetches all nodes that are children of the given node
160
func FetchGraph(ctx context.Context, root cid.Cid, serv ipld.DAGService) error {
161 162 163 164
	return FetchGraphWithDepthLimit(ctx, root, -1, serv)
}

// FetchGraphWithDepthLimit fetches all nodes that are children to the given
165
// node down to the given depth. maxDepth=0 means "only fetch root",
166 167
// maxDepth=1 means "fetch root and its direct children" and so on...
// maxDepth=-1 means unlimited.
168
func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, serv ipld.DAGService) error {
169
	var ng ipld.NodeGetter = serv
170 171
	ds, ok := serv.(*dagService)
	if ok {
172
		ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
173 174
	}

Steven Allen's avatar
Steven Allen committed
175
	set := make(map[cid.Cid]int)
176 177 178 179 180 181 182 183

	// Visit function returns true when:
	// * The element is not in the set and we're not over depthLim
	// * The element is in the set but recorded depth is deeper
	//   than currently seen (if we find it higher in the tree we'll need
	//   to explore deeper than before).
	// depthLim = -1 means we only return true if the element is not in the
	// set.
184
	visit := func(c cid.Cid, depth int) bool {
Steven Allen's avatar
Steven Allen committed
185
		oldDepth, ok := set[c]
186 187 188 189 190 191

		if (ok && depthLim < 0) || (depthLim >= 0 && depth > depthLim) {
			return false
		}

		if !ok || oldDepth > depth {
Steven Allen's avatar
Steven Allen committed
192
			set[c] = depth
193 194 195 196 197
			return true
		}
		return false
	}

198
	// If we have a ProgressTracker, we wrap the visit function to handle it
199
	v, _ := ctx.Value(progressContextKey).(*ProgressTracker)
200
	if v == nil {
201
		return WalkDepth(ctx, GetLinksDirect(ng), root, visit, Concurrent(), WithRoot())
202
	}
203

204
	visitProgress := func(c cid.Cid, depth int) bool {
205
		if visit(c, depth) {
206 207 208
			v.Increment()
			return true
		}
209
		return false
210
	}
211
	return WalkDepth(ctx, GetLinksDirect(ng), root, visitProgress, Concurrent(), WithRoot())
Jeromy's avatar
Jeromy committed
212
}
213

214 215 216 217 218
// GetMany gets many nodes from the DAG at once.
//
// This method may not return all requested nodes (and may or may not return an
// error indicating that it failed to do so. It is up to the caller to verify
// that it received all nodes.
219
func (n *dagService) GetMany(ctx context.Context, keys []cid.Cid) <-chan *ipld.NodeOption {
220
	return getNodesFromBG(ctx, n.Blocks, keys)
221 222
}

223
func dedupKeys(keys []cid.Cid) []cid.Cid {
Steven Allen's avatar
Steven Allen committed
224 225 226 227 228 229 230 231 232 233
	set := cid.NewSet()
	for _, c := range keys {
		set.Add(c)
	}
	if set.Len() == len(keys) {
		return keys
	}
	return set.Keys()
}

234
func getNodesFromBG(ctx context.Context, bs bserv.BlockGetter, keys []cid.Cid) <-chan *ipld.NodeOption {
Steven Allen's avatar
Steven Allen committed
235 236
	keys = dedupKeys(keys)

237
	out := make(chan *ipld.NodeOption, len(keys))
238
	blocks := bs.GetBlocks(ctx, keys)
239 240
	var count int

241 242 243 244 245 246
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
247
					if count != len(keys) {
248
						out <- &ipld.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
249
					}
250 251
					return
				}
Jeromy's avatar
Jeromy committed
252

253
				nd, err := ipld.Decode(b)
254
				if err != nil {
255
					out <- &ipld.NodeOption{Err: err}
256 257
					return
				}
Jeromy's avatar
Jeromy committed
258

259
				out <- &ipld.NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
260 261
				count++

262
			case <-ctx.Done():
263
				out <- &ipld.NodeOption{Err: ctx.Err()}
Jeromy's avatar
Jeromy committed
264
				return
265 266 267
			}
		}
	}()
268
	return out
269 270
}

271 272
// GetLinks is the type of function passed to the EnumerateChildren function(s)
// for getting the children of an IPLD node.
273
type GetLinks func(context.Context, cid.Cid) ([]*ipld.Link, error)
Jeromy's avatar
Jeromy committed
274

275 276 277 278
// GetLinksWithDAG returns a GetLinks function that tries to use the given
// NodeGetter as a LinkGetter to get the children of a given IPLD node. This may
// allow us to traverse the DAG without actually loading and parsing the node in
// question (if we already have the links cached).
279
func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks {
280
	return func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
281
		return ipld.GetLinks(ctx, ng, c)
Jeromy's avatar
Jeromy committed
282
	}
283
}
284

285 286 287 288 289 290
// defaultConcurrentFetch is the default maximum number of concurrent fetches
// that 'fetchNodes' will start at a time
const defaultConcurrentFetch = 32

// WalkOptions represent the parameters of a graph walking algorithm
type WalkOptions struct {
291 292 293
	WithRoot     bool
	IgnoreErrors bool
	Concurrency  int
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
}

// WalkOption is a setter for WalkOptions
type WalkOption func(*WalkOptions)

// WithRoot is a WalkOption indicating that the root node should be visited
func WithRoot() WalkOption {
	return func(walkOptions *WalkOptions) {
		walkOptions.WithRoot = true
	}
}

// Concurrent is a WalkOption indicating that node fetching should be done in
// parallel, with the default concurrency factor.
// NOTE: When using that option, the walk order is *not* guarantee.
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func Concurrent() WalkOption {
	return func(walkOptions *WalkOptions) {
		walkOptions.Concurrency = defaultConcurrentFetch
	}
}

// Concurrency is a WalkOption indicating that node fetching should be done in
// parallel, with a specific concurrency factor.
// NOTE: When using that option, the walk order is *not* guarantee.
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func Concurrency(worker int) WalkOption {
	return func(walkOptions *WalkOptions) {
		walkOptions.Concurrency = worker
	}
}

326 327 328 329 330 331 332 333
// IgnoreErrors is a WalkOption indicating that the walk should attempt to
// continue even when an error occur.
func IgnoreErrors() WalkOption {
	return func(walkOptions *WalkOptions) {
		walkOptions.IgnoreErrors = true
	}
}

334
// WalkGraph will walk the dag in order (depth first) starting at the given root.
335
func Walk(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) bool, options ...WalkOption) error {
336
	visitDepth := func(c cid.Cid, depth int) bool {
337 338 339
		return visit(c)
	}

340
	return WalkDepth(ctx, getLinks, c, visitDepth, options...)
341 342
}

343 344 345
// WalkDepth walks the dag starting at the given root and passes the current
// depth to a given visit function. The visit function can be used to limit DAG
// exploration.
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid, int) bool, options ...WalkOption) error {
	opts := &WalkOptions{}
	for _, opt := range options {
		opt(opts)
	}

	if opts.Concurrency > 1 {
		return parallelWalkDepth(ctx, getLinks, c, visit, opts)
	} else {
		return sequentialWalkDepth(ctx, getLinks, c, 0, visit, opts)
	}
}

func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool, options *WalkOptions) error {
	if depth != 0 || options.WithRoot {
		if !visit(root, depth) {
			return nil
		}
364 365
	}

366
	links, err := getLinks(ctx, root)
367
	if err != nil && !options.IgnoreErrors {
368 369
		return err
	}
370

371
	for _, lnk := range links {
372
		if err := sequentialWalkDepth(ctx, getLinks, lnk.Cid, depth+1, visit, options); err != nil {
373
			return err
374 375 376 377
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
378

379
// ProgressTracker is used to show progress when fetching nodes.
380 381 382 383 384
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

385 386
// DeriveContext returns a new context with value "progress" derived from
// the given one.
387
func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
388
	return context.WithValue(ctx, progressContextKey, p)
389 390
}

391
// Increment adds one to the total progress.
392 393 394 395 396 397
func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

398
// Value returns the current progress.
399 400 401 402 403 404
func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

405
func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *WalkOptions) error {
406
	type cidDepth struct {
407
		cid   cid.Cid
408 409 410 411 412 413 414 415 416 417
		depth int
	}

	type linksDepth struct {
		links []*ipld.Link
		depth int
	}

	feed := make(chan *cidDepth)
	out := make(chan *linksDepth)
418 419
	done := make(chan struct{})

420
	var visitlk sync.Mutex
421
	var wg sync.WaitGroup
422

423 424
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
425
	defer wg.Wait()
426
	defer cancel()
427
	for i := 0; i < options.Concurrency; i++ {
428
		wg.Add(1)
429
		go func() {
430
			defer wg.Done()
431 432 433 434
			for cdepth := range feed {
				ci := cdepth.cid
				depth := cdepth.depth

435 436 437 438 439 440 441 442 443 444
				var shouldVisit bool

				// bypass the root if needed
				if depth != 0 || options.WithRoot {
					visitlk.Lock()
					shouldVisit = visit(ci, depth)
					visitlk.Unlock()
				} else {
					shouldVisit = true
				}
445

446
				if shouldVisit {
447
					links, err := getLinks(ctx, ci)
448
					if err != nil && !options.IgnoreErrors {
Steven Allen's avatar
Steven Allen committed
449 450 451 452
						select {
						case errChan <- err:
						case <-fetchersCtx.Done():
						}
453 454 455
						return
					}

456 457 458 459 460
					outLinks := &linksDepth{
						links: links,
						depth: depth + 1,
					}

461
					select {
462
					case out <- outLinks:
463
					case <-fetchersCtx.Done():
464 465 466
						return
					}
				}
Jeromy's avatar
Jeromy committed
467
				select {
468
				case done <- struct{}{}:
469
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
470 471
				}
			}
472
		}()
Jeromy's avatar
Jeromy committed
473
	}
474
	defer close(feed)
Jeromy's avatar
Jeromy committed
475

476
	send := feed
477
	var todoQueue []*cidDepth
478
	var inProgress int
Jeromy's avatar
Jeromy committed
479

480
	next := &cidDepth{
481 482
		cid:   root,
		depth: 0,
483
	}
484

485 486 487 488
	for {
		select {
		case send <- next:
			inProgress++
489 490 491
			if len(todoQueue) > 0 {
				next = todoQueue[0]
				todoQueue = todoQueue[1:]
492 493 494 495 496 497 498 499 500
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
501 502 503 504 505 506 507
		case linksDepth := <-out:
			for _, lnk := range linksDepth.links {
				cd := &cidDepth{
					cid:   lnk.Cid,
					depth: linksDepth.depth,
				}

508
				if next == nil {
509
					next = cd
510 511
					send = feed
				} else {
512
					todoQueue = append(todoQueue, cd)
513
				}
Jeromy's avatar
Jeromy committed
514
			}
515 516
		case err := <-errChan:
			return err
517

518
		case <-ctx.Done():
519
			return ctx.Err()
520
		}
Jeromy's avatar
Jeromy committed
521 522
	}
}
523

524 525 526 527
var _ ipld.LinkGetter = &dagService{}
var _ ipld.NodeGetter = &dagService{}
var _ ipld.NodeGetter = &sesGetter{}
var _ ipld.DAGService = &dagService{}