merkledag.go 13.5 KB
Newer Older
1
// Package merkledag implements the IPFS Merkle DAG data structures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

Jeromy's avatar
Jeromy committed
9 10 11 12 13
	blocks "github.com/ipfs/go-block-format"
	bserv "github.com/ipfs/go-blockservice"
	cid "github.com/ipfs/go-cid"
	ipldcbor "github.com/ipfs/go-ipld-cbor"
	ipld "github.com/ipfs/go-ipld-format"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14 15
)

16 17 18 19
// TODO: We should move these registrations elsewhere. Really, most of the IPLD
// functionality should go in a `go-ipld` repo but that will take a lot of work
// and design.
func init() {
20 21 22
	ipld.Register(cid.DagProtobuf, DecodeProtobufBlock)
	ipld.Register(cid.Raw, DecodeRawBlock)
	ipld.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
23 24
}

25 26 27 28 29
// contextKey is a type to use as value for the ProgressTracker contexts.
type contextKey string

const progressContextKey contextKey = "progress"

30
// NewDAGService constructs a new DAGService (using the default implementation).
31
// Note that the default implementation is also an ipld.LinkGetter.
32
func NewDAGService(bs bserv.BlockService) *dagService {
33
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
34 35
}

36
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
37 38
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
39 40
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
41
type dagService struct {
42
	Blocks bserv.BlockService
43 44
}

45
// Add adds a node to the dagService, storing the block in the BlockService
46
func (n *dagService) Add(ctx context.Context, nd ipld.Node) error {
47
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
48
		return fmt.Errorf("dagService is nil")
49 50
	}

51
	return n.Blocks.AddBlock(nd)
52 53
}

54
func (n *dagService) AddMany(ctx context.Context, nds []ipld.Node) error {
55 56 57
	blks := make([]blocks.Block, len(nds))
	for i, nd := range nds {
		blks[i] = nd
58
	}
59
	return n.Blocks.AddBlocks(blks)
60 61
}

62
// Get retrieves a node from the dagService, fetching the block in the BlockService
63
func (n *dagService) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) {
64
	if n == nil {
65
		return nil, fmt.Errorf("dagService is nil")
66
	}
Jeromy's avatar
Jeromy committed
67

68 69
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
70

Jeromy's avatar
Jeromy committed
71
	b, err := n.Blocks.GetBlock(ctx, c)
72
	if err != nil {
73
		if err == bserv.ErrNotFound {
74
			return nil, ipld.ErrNotFound
75
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
76
		return nil, fmt.Errorf("failed to get block for %s: %v", c, err)
77 78
	}

79
	return ipld.Decode(b)
80
}
Jeromy's avatar
Jeromy committed
81

82 83
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
84
func (n *dagService) GetLinks(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
85 86 87
	if c.Type() == cid.Raw {
		return nil, nil
	}
88 89 90 91
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
92
	return node.Links(), nil
93 94
}

95
func (n *dagService) Remove(ctx context.Context, c cid.Cid) error {
96
	return n.Blocks.DeleteBlock(c)
97 98
}

99 100 101 102 103
// RemoveMany removes multiple nodes from the DAG. It will likely be faster than
// removing them individually.
//
// This operation is not atomic. If it returns an error, some nodes may or may
// not have been removed.
104
func (n *dagService) RemoveMany(ctx context.Context, cids []cid.Cid) error {
105 106 107 108 109 110 111
	// TODO(#4608): make this batch all the way down.
	for _, c := range cids {
		if err := n.Blocks.DeleteBlock(c); err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
112 113
}

114 115 116
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
117
func GetLinksDirect(serv ipld.NodeGetter) GetLinks {
118
	return func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
119
		nd, err := serv.Get(ctx, c)
120
		if err != nil {
Jeromy's avatar
Jeromy committed
121
			if err == bserv.ErrNotFound {
122
				err = ipld.ErrNotFound
Jeromy's avatar
Jeromy committed
123
			}
124 125
			return nil, err
		}
126
		return nd.Links(), nil
127 128 129
	}
}

130 131 132 133
type sesGetter struct {
	bs *bserv.Session
}

134
// Get gets a single node from the DAG.
135
func (sg *sesGetter) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) {
136
	blk, err := sg.bs.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
137 138
	switch err {
	case bserv.ErrNotFound:
139
		return nil, ipld.ErrNotFound
Jeromy's avatar
Jeromy committed
140
	default:
141
		return nil, err
Jeromy's avatar
Jeromy committed
142 143
	case nil:
		// noop
144 145
	}

146
	return ipld.Decode(blk)
147 148
}

149
// GetMany gets many nodes at once, batching the request if possible.
150
func (sg *sesGetter) GetMany(ctx context.Context, keys []cid.Cid) <-chan *ipld.NodeOption {
151 152 153
	return getNodesFromBG(ctx, sg.bs, keys)
}

Jeromy's avatar
Jeromy committed
154
// Session returns a NodeGetter using a new session for block fetches.
155 156
func (n *dagService) Session(ctx context.Context) ipld.NodeGetter {
	return &sesGetter{bserv.NewSession(ctx, n.Blocks)}
Jeromy's avatar
Jeromy committed
157 158
}

159
// FetchGraph fetches all nodes that are children of the given node
160
func FetchGraph(ctx context.Context, root cid.Cid, serv ipld.DAGService) error {
161 162 163 164
	return FetchGraphWithDepthLimit(ctx, root, -1, serv)
}

// FetchGraphWithDepthLimit fetches all nodes that are children to the given
165
// node down to the given depth. maxDepth=0 means "only fetch root",
166 167
// maxDepth=1 means "fetch root and its direct children" and so on...
// maxDepth=-1 means unlimited.
168
func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, serv ipld.DAGService) error {
169
	var ng ipld.NodeGetter = serv
170 171
	ds, ok := serv.(*dagService)
	if ok {
172
		ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
173 174
	}

Steven Allen's avatar
Steven Allen committed
175
	set := make(map[cid.Cid]int)
176 177 178 179 180 181 182 183

	// Visit function returns true when:
	// * The element is not in the set and we're not over depthLim
	// * The element is in the set but recorded depth is deeper
	//   than currently seen (if we find it higher in the tree we'll need
	//   to explore deeper than before).
	// depthLim = -1 means we only return true if the element is not in the
	// set.
184
	visit := func(c cid.Cid, depth int) bool {
Steven Allen's avatar
Steven Allen committed
185
		oldDepth, ok := set[c]
186 187 188 189 190 191

		if (ok && depthLim < 0) || (depthLim >= 0 && depth > depthLim) {
			return false
		}

		if !ok || oldDepth > depth {
Steven Allen's avatar
Steven Allen committed
192
			set[c] = depth
193 194 195 196 197
			return true
		}
		return false
	}

198
	// If we have a ProgressTracker, we wrap the visit function to handle it
199
	v, _ := ctx.Value(progressContextKey).(*ProgressTracker)
200
	if v == nil {
201
		return WalkDepth(ctx, GetLinksDirect(ng), root, visit, Concurrent(), WithRoot())
202
	}
203

204
	visitProgress := func(c cid.Cid, depth int) bool {
205
		if visit(c, depth) {
206 207 208
			v.Increment()
			return true
		}
209
		return false
210
	}
211
	return WalkDepth(ctx, GetLinksDirect(ng), root, visitProgress, Concurrent(), WithRoot())
Jeromy's avatar
Jeromy committed
212
}
213

214 215 216 217 218
// GetMany gets many nodes from the DAG at once.
//
// This method may not return all requested nodes (and may or may not return an
// error indicating that it failed to do so. It is up to the caller to verify
// that it received all nodes.
219
func (n *dagService) GetMany(ctx context.Context, keys []cid.Cid) <-chan *ipld.NodeOption {
220
	return getNodesFromBG(ctx, n.Blocks, keys)
221 222
}

223
func dedupKeys(keys []cid.Cid) []cid.Cid {
Steven Allen's avatar
Steven Allen committed
224 225 226 227 228 229 230 231 232 233
	set := cid.NewSet()
	for _, c := range keys {
		set.Add(c)
	}
	if set.Len() == len(keys) {
		return keys
	}
	return set.Keys()
}

234
func getNodesFromBG(ctx context.Context, bs bserv.BlockGetter, keys []cid.Cid) <-chan *ipld.NodeOption {
Steven Allen's avatar
Steven Allen committed
235 236
	keys = dedupKeys(keys)

237
	out := make(chan *ipld.NodeOption, len(keys))
238
	blocks := bs.GetBlocks(ctx, keys)
239 240
	var count int

241 242 243 244 245 246
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
247
					if count != len(keys) {
248
						out <- &ipld.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
249
					}
250 251
					return
				}
Jeromy's avatar
Jeromy committed
252

253
				nd, err := ipld.Decode(b)
254
				if err != nil {
255
					out <- &ipld.NodeOption{Err: err}
256 257
					return
				}
Jeromy's avatar
Jeromy committed
258

259
				out <- &ipld.NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
260 261
				count++

262
			case <-ctx.Done():
263
				out <- &ipld.NodeOption{Err: ctx.Err()}
Jeromy's avatar
Jeromy committed
264
				return
265 266 267
			}
		}
	}()
268
	return out
269 270
}

271 272
// GetLinks is the type of function passed to the EnumerateChildren function(s)
// for getting the children of an IPLD node.
273
type GetLinks func(context.Context, cid.Cid) ([]*ipld.Link, error)
Jeromy's avatar
Jeromy committed
274

275 276 277 278
// GetLinksWithDAG returns a GetLinks function that tries to use the given
// NodeGetter as a LinkGetter to get the children of a given IPLD node. This may
// allow us to traverse the DAG without actually loading and parsing the node in
// question (if we already have the links cached).
279
func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks {
280
	return func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
281
		return ipld.GetLinks(ctx, ng, c)
Jeromy's avatar
Jeromy committed
282
	}
283
}
284

285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
// defaultConcurrentFetch is the default maximum number of concurrent fetches
// that 'fetchNodes' will start at a time
const defaultConcurrentFetch = 32

// WalkOptions represent the parameters of a graph walking algorithm
type WalkOptions struct {
	WithRoot       bool
	IgnoreBadBlock bool
	Concurrency    int
}

// WalkOption is a setter for WalkOptions
type WalkOption func(*WalkOptions)

// WithRoot is a WalkOption indicating that the root node should be visited
func WithRoot() WalkOption {
	return func(walkOptions *WalkOptions) {
		walkOptions.WithRoot = true
	}
}

// Concurrent is a WalkOption indicating that node fetching should be done in
// parallel, with the default concurrency factor.
// NOTE: When using that option, the walk order is *not* guarantee.
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func Concurrent() WalkOption {
	return func(walkOptions *WalkOptions) {
		walkOptions.Concurrency = defaultConcurrentFetch
	}
}

// Concurrency is a WalkOption indicating that node fetching should be done in
// parallel, with a specific concurrency factor.
// NOTE: When using that option, the walk order is *not* guarantee.
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func Concurrency(worker int) WalkOption {
	return func(walkOptions *WalkOptions) {
		walkOptions.Concurrency = worker
	}
}

326
// WalkGraph will walk the dag in order (depth first) starting at the given root.
327
func Walk(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) bool, options ...WalkOption) error {
328
	visitDepth := func(c cid.Cid, depth int) bool {
329 330 331
		return visit(c)
	}

332
	return WalkDepth(ctx, getLinks, c, visitDepth, options...)
333 334
}

335 336 337
// WalkDepth walks the dag starting at the given root and passes the current
// depth to a given visit function. The visit function can be used to limit DAG
// exploration.
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid, int) bool, options ...WalkOption) error {
	opts := &WalkOptions{}
	for _, opt := range options {
		opt(opts)
	}

	if opts.Concurrency > 1 {
		return parallelWalkDepth(ctx, getLinks, c, visit, opts)
	} else {
		return sequentialWalkDepth(ctx, getLinks, c, 0, visit, opts)
	}
}

func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool, options *WalkOptions) error {
	if depth != 0 || options.WithRoot {
		if !visit(root, depth) {
			return nil
		}
356 357
	}

358 359
	links, err := getLinks(ctx, root)
	if err != nil {
360 361
		return err
	}
362

363
	for _, lnk := range links {
364
		if err := sequentialWalkDepth(ctx, getLinks, lnk.Cid, depth+1, visit, options); err != nil {
365
			return err
366 367 368 369
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
370

371
// ProgressTracker is used to show progress when fetching nodes.
372 373 374 375 376
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

377 378
// DeriveContext returns a new context with value "progress" derived from
// the given one.
379
func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
380
	return context.WithValue(ctx, progressContextKey, p)
381 382
}

383
// Increment adds one to the total progress.
384 385 386 387 388 389
func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

390
// Value returns the current progress.
391 392 393 394 395 396
func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

397
func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *WalkOptions) error {
398
	type cidDepth struct {
399
		cid   cid.Cid
400 401 402 403 404 405 406 407 408 409
		depth int
	}

	type linksDepth struct {
		links []*ipld.Link
		depth int
	}

	feed := make(chan *cidDepth)
	out := make(chan *linksDepth)
410 411
	done := make(chan struct{})

412
	var visitlk sync.Mutex
413
	var wg sync.WaitGroup
414

415 416
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
417
	defer wg.Wait()
418
	defer cancel()
419
	for i := 0; i < options.Concurrency; i++ {
420
		wg.Add(1)
421
		go func() {
422
			defer wg.Done()
423 424 425 426
			for cdepth := range feed {
				ci := cdepth.cid
				depth := cdepth.depth

427 428 429 430 431 432 433 434 435 436
				var shouldVisit bool

				// bypass the root if needed
				if depth != 0 || options.WithRoot {
					visitlk.Lock()
					shouldVisit = visit(ci, depth)
					visitlk.Unlock()
				} else {
					shouldVisit = true
				}
437

438
				if shouldVisit {
439
					links, err := getLinks(ctx, ci)
440
					if err != nil {
Steven Allen's avatar
Steven Allen committed
441 442 443 444
						select {
						case errChan <- err:
						case <-fetchersCtx.Done():
						}
445 446 447
						return
					}

448 449 450 451 452
					outLinks := &linksDepth{
						links: links,
						depth: depth + 1,
					}

453
					select {
454
					case out <- outLinks:
455
					case <-fetchersCtx.Done():
456 457 458
						return
					}
				}
Jeromy's avatar
Jeromy committed
459
				select {
460
				case done <- struct{}{}:
461
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
462 463
				}
			}
464
		}()
Jeromy's avatar
Jeromy committed
465
	}
466
	defer close(feed)
Jeromy's avatar
Jeromy committed
467

468
	send := feed
469
	var todoQueue []*cidDepth
470
	var inProgress int
Jeromy's avatar
Jeromy committed
471

472
	next := &cidDepth{
473 474
		cid:   root,
		depth: 0,
475
	}
476

477 478 479 480
	for {
		select {
		case send <- next:
			inProgress++
481 482 483
			if len(todoQueue) > 0 {
				next = todoQueue[0]
				todoQueue = todoQueue[1:]
484 485 486 487 488 489 490 491 492
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
493 494 495 496 497 498 499
		case linksDepth := <-out:
			for _, lnk := range linksDepth.links {
				cd := &cidDepth{
					cid:   lnk.Cid,
					depth: linksDepth.depth,
				}

500
				if next == nil {
501
					next = cd
502 503
					send = feed
				} else {
504
					todoQueue = append(todoQueue, cd)
505
				}
Jeromy's avatar
Jeromy committed
506
			}
507 508
		case err := <-errChan:
			return err
509

510
		case <-ctx.Done():
511
			return ctx.Err()
512
		}
Jeromy's avatar
Jeromy committed
513 514
	}
}
515

516 517 518 519
var _ ipld.LinkGetter = &dagService{}
var _ ipld.NodeGetter = &dagService{}
var _ ipld.NodeGetter = &sesGetter{}
var _ ipld.DAGService = &dagService{}