merkledag.go 11.4 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

9
	bserv "github.com/ipfs/go-ipfs/blockservice"
10
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
11

12 13
	cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
	node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
Łukasz Magiera's avatar
Łukasz Magiera committed
14
	ipldcbor "gx/ipfs/QmWCs8kMecJwCPK8JThue8TjgM2ieJ2HjTLDu7Cv2NEmZi/go-ipld-cbor"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

17 18 19 20 21 22 23 24 25
// TODO: We should move these registrations elsewhere. Really, most of the IPLD
// functionality should go in a `go-ipld` repo but that will take a lot of work
// and design.
func init() {
	node.Register(cid.DagProtobuf, DecodeProtobufBlock)
	node.Register(cid.Raw, DecodeRawBlock)
	node.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
}

26
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
27

Jeromy's avatar
Jeromy committed
28 29
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
30 31 32
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
33

Arthur Elliott's avatar
Arthur Elliott committed
34
	// GetMany returns a channel of NodeOption given
Arthur Elliott's avatar
Arthur Elliott committed
35
	// a set of CIDs.
Jeromy's avatar
Jeromy committed
36
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
37 38

	Batch() *Batch
39 40

	LinkService
Jeromy's avatar
Jeromy committed
41 42
}

43
type LinkService interface {
44 45 46 47
	// GetLinks return all links for a node.  The complete node does not
	// necessarily have to exist locally, or at all.  For example, raw
	// leaves cannot possibly have links so there is no need to look
	// at the node.
Jeromy's avatar
Jeromy committed
48
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
49 50

	GetOfflineLinkService() LinkService
51 52
}

53
func NewDAGService(bs bserv.BlockService) *dagService {
54
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
55 56
}

57
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
58 59
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
60 61
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
62
type dagService struct {
63
	Blocks bserv.BlockService
64 65
}

66
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
67
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
68
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
69
		return nil, fmt.Errorf("dagService is nil")
70 71
	}

72
	return n.Blocks.AddBlock(nd)
73 74
}

75
func (n *dagService) Batch() *Batch {
76
	return &Batch{
Steven Allen's avatar
Steven Allen committed
77 78 79
		ds:            n,
		commitResults: make(chan error, ParallelBatchCommits),
		MaxSize:       8 << 20,
80 81 82 83 84 85

		// By default, only batch up to 128 nodes at a time.
		// The current implementation of flatfs opens this many file
		// descriptors at the same time for the optimized batch write.
		MaxBlocks: 128,
	}
86 87
}

88
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
89
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
90
	if n == nil {
91
		return nil, fmt.Errorf("dagService is nil")
92
	}
Jeromy's avatar
Jeromy committed
93

94 95
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
96

Jeromy's avatar
Jeromy committed
97
	b, err := n.Blocks.GetBlock(ctx, c)
98
	if err != nil {
99 100 101
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
102
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
103 104
	}

105
	return node.Decode(b)
106
}
Jeromy's avatar
Jeromy committed
107

108 109
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
Jeromy's avatar
Jeromy committed
110
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
111 112 113
	if c.Type() == cid.Raw {
		return nil, nil
	}
114 115 116 117
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
118
	return node.Links(), nil
119 120
}

121
func (n *dagService) GetOfflineLinkService() LinkService {
122 123
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
124 125 126 127 128 129
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
130
func (n *dagService) Remove(nd node.Node) error {
131
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
132 133
}

134 135 136
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
Jeromy's avatar
Jeromy committed
137
func GetLinksDirect(serv node.NodeGetter) GetLinks {
138 139 140
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
		node, err := serv.Get(ctx, c)
		if err != nil {
Jeromy's avatar
Jeromy committed
141 142 143
			if err == bserv.ErrNotFound {
				err = ErrNotFound
			}
144 145 146 147 148 149
			return nil, err
		}
		return node.Links(), nil
	}
}

150 151 152 153 154 155
type sesGetter struct {
	bs *bserv.Session
}

func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
	blk, err := sg.bs.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
156 157 158 159
	switch err {
	case bserv.ErrNotFound:
		return nil, ErrNotFound
	default:
160
		return nil, err
Jeromy's avatar
Jeromy committed
161 162
	case nil:
		// noop
163 164
	}

165
	return node.Decode(blk)
166 167
}

168
// FetchGraph fetches all nodes that are children of the given node
169
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
170 171 172
	var ng node.NodeGetter = serv
	ds, ok := serv.(*dagService)
	if ok {
173
		ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
174 175
	}

176 177
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
178
		return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
179 180 181 182 183 184 185 186 187 188
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
189
	return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
Jeromy's avatar
Jeromy committed
190
}
191

Jeromy's avatar
Jeromy committed
192 193
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
194
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
195
	var out []int
Jeromy's avatar
Jeromy committed
196 197
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
198
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
199 200
		}
	}
Jeromy's avatar
Jeromy committed
201
	return out
Jeromy's avatar
Jeromy committed
202 203
}

204
type NodeOption struct {
Jeromy's avatar
Jeromy committed
205
	Node node.Node
206 207 208
	Err  error
}

Jeromy's avatar
Jeromy committed
209
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
210
	out := make(chan *NodeOption, len(keys))
211
	blocks := ds.Blocks.GetBlocks(ctx, keys)
212 213
	var count int

214 215 216 217 218 219
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
220
					if count != len(keys) {
221
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
222
					}
223 224
					return
				}
Jeromy's avatar
Jeromy committed
225

226
				nd, err := node.Decode(b)
227 228
				if err != nil {
					out <- &NodeOption{Err: err}
229 230
					return
				}
Jeromy's avatar
Jeromy committed
231

232
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
233 234
				count++

235
			case <-ctx.Done():
236
				out <- &NodeOption{Err: ctx.Err()}
237 238 239 240
				return
			}
		}
	}()
241
	return out
242 243
}

244
// GetDAG will fill out all of the links of the given Node.
245 246
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
247
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
248
	var cids []*cid.Cid
249 250
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
251 252
	}

Jeromy's avatar
Jeromy committed
253
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
254 255
}

Jeromy's avatar
Jeromy committed
256 257
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
258
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
259 260 261 262 263 264

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
265
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
266
	for i := range keys {
267
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
268 269
	}

270
	dedupedKeys := dedupeKeys(keys)
271
	go func() {
272 273 274
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

275
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
276

277
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
278
			select {
279
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
280
				if !ok {
Jeromy's avatar
Jeromy committed
281 282 283
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
284 285
					return
				}
Jeromy's avatar
Jeromy committed
286

287
				if opt.Err != nil {
288 289 290
					for _, p := range promises {
						p.Fail(opt.Err)
					}
291 292 293 294
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
295
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
296
				for _, i := range is {
297
					count++
298
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
299 300 301
				}
			case <-ctx.Done():
				return
302 303 304
			}
		}
	}()
Jeromy's avatar
Jeromy committed
305 306 307
	return promises
}

308
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
309 310 311 312
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
313
	}
Jeromy's avatar
Jeromy committed
314
	return set.Keys()
315 316
}

317
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
318
	return &nodePromise{
Jeromy's avatar
Jeromy committed
319
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
320
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
321
		err:  make(chan error, 1),
322
	}
Jeromy's avatar
Jeromy committed
323 324 325
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
326
	cache node.Node
327
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
328
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
329
	ctx   context.Context
Jeromy's avatar
Jeromy committed
330
	err   chan error
Jeromy's avatar
Jeromy committed
331 332
}

Jeromy's avatar
Jeromy committed
333 334 335 336
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
337
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
338
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
339
	Fail(err error)
Jeromy's avatar
Jeromy committed
340
	Send(node.Node)
Jeromy's avatar
Jeromy committed
341 342 343
}

func (np *nodePromise) Fail(err error) {
344 345 346 347 348 349 350 351 352
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
353
	np.err <- err
Jeromy's avatar
Jeromy committed
354 355
}

Jeromy's avatar
Jeromy committed
356
func (np *nodePromise) Send(nd node.Node) {
357 358
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
359
	if np.cache != nil {
360 361 362 363 364 365 366 367 368 369 370 371
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
372
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
373 374 375 376 377
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
378 379 380
	}

	select {
381 382
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
383 384
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
385 386
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
387 388
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
389
	}
390
}
391

392 393
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

394 395 396
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
397 398 399
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
400 401
		return err
	}
402
	for _, lnk := range links {
403
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
404
		if visit(c) {
405
			err = EnumerateChildren(ctx, getLinks, c, visit)
406 407 408 409 410 411 412
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
413

414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

435 436 437
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
438

439
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
440
	feed := make(chan *cid.Cid)
441
	out := make(chan []*node.Link)
442 443 444
	done := make(chan struct{})

	var setlk sync.Mutex
445

446 447
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
448

449
	defer cancel()
450

451 452
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
453
			for ic := range feed {
454
				links, err := getLinks(ctx, ic)
455 456 457
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
458
				}
459

460 461 462
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
463

464
				if unseen {
465
					select {
466
					case out <- links:
467
					case <-fetchersCtx.Done():
468 469 470
						return
					}
				}
Jeromy's avatar
Jeromy committed
471
				select {
472
				case done <- struct{}{}:
473
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
474 475
				}
			}
476
		}()
Jeromy's avatar
Jeromy committed
477
	}
478
	defer close(feed)
Jeromy's avatar
Jeromy committed
479

480
	send := feed
481
	var todobuffer []*cid.Cid
482
	var inProgress int
Jeromy's avatar
Jeromy committed
483

484
	next := c
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
501 502
		case links := <-out:
			for _, lnk := range links {
503 504 505 506 507 508
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
509
			}
510 511
		case err := <-errChan:
			return err
512

513
		case <-ctx.Done():
514
			return ctx.Err()
515
		}
Jeromy's avatar
Jeromy committed
516
	}
517

Jeromy's avatar
Jeromy committed
518
}