merkledag.go 11.8 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

9
	bserv "github.com/ipfs/go-ipfs/blockservice"
10
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
11

12 13 14 15
	cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
	node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
	blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
	ipldcbor "gx/ipfs/QmcRu2X6kdDKmCbMpYXKHVgDrhLqVYCACMe1aghUcdHj2z/go-ipld-cbor"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

18 19 20 21 22 23 24 25 26
// TODO: We should move these registrations elsewhere. Really, most of the IPLD
// functionality should go in a `go-ipld` repo but that will take a lot of work
// and design.
func init() {
	node.Register(cid.DagProtobuf, DecodeProtobufBlock)
	node.Register(cid.Raw, DecodeRawBlock)
	node.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
}

27
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
28

Jeromy's avatar
Jeromy committed
29 30
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
31 32 33
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
34

Arthur Elliott's avatar
Arthur Elliott committed
35
	// GetMany returns a channel of NodeOption given
Arthur Elliott's avatar
Arthur Elliott committed
36
	// a set of CIDs.
Jeromy's avatar
Jeromy committed
37
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
38 39

	Batch() *Batch
40 41

	LinkService
Jeromy's avatar
Jeromy committed
42 43
}

44
type LinkService interface {
45 46 47 48
	// GetLinks return all links for a node.  The complete node does not
	// necessarily have to exist locally, or at all.  For example, raw
	// leaves cannot possibly have links so there is no need to look
	// at the node.
Jeromy's avatar
Jeromy committed
49
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
50 51

	GetOfflineLinkService() LinkService
52 53
}

54
func NewDAGService(bs bserv.BlockService) *dagService {
55
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
56 57
}

58
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
59 60
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
61 62
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
63
type dagService struct {
64
	Blocks bserv.BlockService
65 66
}

67
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
68
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
69
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
70
		return nil, fmt.Errorf("dagService is nil")
71 72
	}

73
	return n.Blocks.AddBlock(nd)
74 75
}

76
func (n *dagService) Batch() *Batch {
77 78 79 80 81 82 83 84 85
	return &Batch{
		ds:      n,
		MaxSize: 8 << 20,

		// By default, only batch up to 128 nodes at a time.
		// The current implementation of flatfs opens this many file
		// descriptors at the same time for the optimized batch write.
		MaxBlocks: 128,
	}
86 87
}

88
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
89
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
90
	if n == nil {
91
		return nil, fmt.Errorf("dagService is nil")
92
	}
Jeromy's avatar
Jeromy committed
93

94 95
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
96

Jeromy's avatar
Jeromy committed
97
	b, err := n.Blocks.GetBlock(ctx, c)
98
	if err != nil {
99 100 101
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
102
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
103 104
	}

105
	return node.Decode(b)
106
}
Jeromy's avatar
Jeromy committed
107

108 109
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
Jeromy's avatar
Jeromy committed
110
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
111 112 113
	if c.Type() == cid.Raw {
		return nil, nil
	}
114 115 116 117
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
118
	return node.Links(), nil
119 120
}

121
func (n *dagService) GetOfflineLinkService() LinkService {
122 123
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
124 125 126 127 128 129
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
130
func (n *dagService) Remove(nd node.Node) error {
131
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
132 133
}

134 135 136
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
Jeromy's avatar
Jeromy committed
137
func GetLinksDirect(serv node.NodeGetter) GetLinks {
138 139 140
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
		node, err := serv.Get(ctx, c)
		if err != nil {
Jeromy's avatar
Jeromy committed
141 142 143
			if err == bserv.ErrNotFound {
				err = ErrNotFound
			}
144 145 146 147 148 149
			return nil, err
		}
		return node.Links(), nil
	}
}

150 151 152 153 154 155
type sesGetter struct {
	bs *bserv.Session
}

func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
	blk, err := sg.bs.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
156 157 158 159
	switch err {
	case bserv.ErrNotFound:
		return nil, ErrNotFound
	default:
160
		return nil, err
Jeromy's avatar
Jeromy committed
161 162
	case nil:
		// noop
163 164
	}

165
	return node.Decode(blk)
166 167
}

168
// FetchGraph fetches all nodes that are children of the given node
169
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
170 171 172
	var ng node.NodeGetter = serv
	ds, ok := serv.(*dagService)
	if ok {
173
		ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
174 175
	}

176 177
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
178
		return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
179 180 181 182 183 184 185 186 187 188
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
189
	return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
Jeromy's avatar
Jeromy committed
190
}
191

Jeromy's avatar
Jeromy committed
192 193
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
194
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
195
	var out []int
Jeromy's avatar
Jeromy committed
196 197
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
198
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
199 200
		}
	}
Jeromy's avatar
Jeromy committed
201
	return out
Jeromy's avatar
Jeromy committed
202 203
}

204
type NodeOption struct {
Jeromy's avatar
Jeromy committed
205
	Node node.Node
206 207 208
	Err  error
}

Jeromy's avatar
Jeromy committed
209
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
210
	out := make(chan *NodeOption, len(keys))
211
	blocks := ds.Blocks.GetBlocks(ctx, keys)
212 213
	var count int

214 215 216 217 218 219
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
220
					if count != len(keys) {
221
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
222
					}
223 224
					return
				}
Jeromy's avatar
Jeromy committed
225

226
				nd, err := node.Decode(b)
227 228
				if err != nil {
					out <- &NodeOption{Err: err}
229 230
					return
				}
Jeromy's avatar
Jeromy committed
231

232
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
233 234
				count++

235
			case <-ctx.Done():
236
				out <- &NodeOption{Err: ctx.Err()}
237 238 239 240
				return
			}
		}
	}()
241
	return out
242 243
}

244
// GetDAG will fill out all of the links of the given Node.
245 246
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
247
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
248
	var cids []*cid.Cid
249 250
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
251 252
	}

Jeromy's avatar
Jeromy committed
253
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
254 255
}

Jeromy's avatar
Jeromy committed
256 257
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
258
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
259 260 261 262 263 264

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
265
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
266
	for i := range keys {
267
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
268 269
	}

270
	dedupedKeys := dedupeKeys(keys)
271
	go func() {
272 273 274
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

275
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
276

277
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
278
			select {
279
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
280
				if !ok {
Jeromy's avatar
Jeromy committed
281 282 283
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
284 285
					return
				}
Jeromy's avatar
Jeromy committed
286

287
				if opt.Err != nil {
288 289 290
					for _, p := range promises {
						p.Fail(opt.Err)
					}
291 292 293 294
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
295
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
296
				for _, i := range is {
297
					count++
298
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
299 300 301
				}
			case <-ctx.Done():
				return
302 303 304
			}
		}
	}()
Jeromy's avatar
Jeromy committed
305 306 307
	return promises
}

308
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
309 310 311 312
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
313
	}
Jeromy's avatar
Jeromy committed
314
	return set.Keys()
315 316
}

317
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
318
	return &nodePromise{
Jeromy's avatar
Jeromy committed
319
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
320
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
321
		err:  make(chan error, 1),
322
	}
Jeromy's avatar
Jeromy committed
323 324 325
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
326
	cache node.Node
327
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
328
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
329
	ctx   context.Context
Jeromy's avatar
Jeromy committed
330
	err   chan error
Jeromy's avatar
Jeromy committed
331 332
}

Jeromy's avatar
Jeromy committed
333 334 335 336
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
337
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
338
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
339
	Fail(err error)
Jeromy's avatar
Jeromy committed
340
	Send(node.Node)
Jeromy's avatar
Jeromy committed
341 342 343
}

func (np *nodePromise) Fail(err error) {
344 345 346 347 348 349 350 351 352
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
353
	np.err <- err
Jeromy's avatar
Jeromy committed
354 355
}

Jeromy's avatar
Jeromy committed
356
func (np *nodePromise) Send(nd node.Node) {
357 358
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
359
	if np.cache != nil {
360 361 362 363 364 365 366 367 368 369 370 371
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
372
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
373 374 375 376 377
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
378 379 380
	}

	select {
381 382
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
383 384
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
385 386
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
387 388
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
389
	}
390
}
391 392 393 394

type Batch struct {
	ds *dagService

395 396 397 398
	blocks    []blocks.Block
	size      int
	MaxSize   int
	MaxBlocks int
399 400
}

Jeromy's avatar
Jeromy committed
401
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
402
	t.blocks = append(t.blocks, nd)
403
	t.size += len(nd.RawData())
404
	if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
Jeromy's avatar
Jeromy committed
405
		return nd.Cid(), t.Commit()
406
	}
Jeromy's avatar
Jeromy committed
407
	return nd.Cid(), nil
408 409 410
}

func (t *Batch) Commit() error {
411 412
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
413 414 415
	t.size = 0
	return err
}
416

417 418
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

419 420 421
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
422 423 424
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
425 426
		return err
	}
427
	for _, lnk := range links {
428
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
429
		if visit(c) {
430
			err = EnumerateChildren(ctx, getLinks, c, visit)
431 432 433 434 435 436 437
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
438

439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

460 461 462
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
463

464
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
465
	feed := make(chan *cid.Cid)
466
	out := make(chan []*node.Link)
467 468 469
	done := make(chan struct{})

	var setlk sync.Mutex
470

471 472
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
473

474
	defer cancel()
475

476 477
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
478
			for ic := range feed {
479
				links, err := getLinks(ctx, ic)
480 481 482
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
483
				}
484

485 486 487
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
488

489
				if unseen {
490
					select {
491
					case out <- links:
492
					case <-fetchersCtx.Done():
493 494 495
						return
					}
				}
Jeromy's avatar
Jeromy committed
496
				select {
497
				case done <- struct{}{}:
498
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
499 500
				}
			}
501
		}()
Jeromy's avatar
Jeromy committed
502
	}
503
	defer close(feed)
Jeromy's avatar
Jeromy committed
504

505
	send := feed
506
	var todobuffer []*cid.Cid
507
	var inProgress int
Jeromy's avatar
Jeromy committed
508

509
	next := c
510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
526 527
		case links := <-out:
			for _, lnk := range links {
528 529 530 531 532 533
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
534
			}
535 536
		case err := <-errChan:
			return err
537

538
		case <-ctx.Done():
539
			return ctx.Err()
540
		}
Jeromy's avatar
Jeromy committed
541
	}
542

Jeromy's avatar
Jeromy committed
543
}