merkledag.go 11.8 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

Arthur Elliott's avatar
Arthur Elliott committed
9 10
	blocks "gx/ipfs/QmVA4mafxbfH5aEvNz8fyoxC6J1xhAtw88B4GerPznSZBg/go-block-format"

11
	bserv "github.com/ipfs/go-ipfs/blockservice"
12
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
13

14
	cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
Steven Allen's avatar
Steven Allen committed
15
	ipldcbor "gx/ipfs/QmXgUVPAxjMLZSyxx818YstJJAoRg3nyPWENmBLVzLtoax/go-ipld-cbor"
16
	node "gx/ipfs/QmYNyRZJBUYPNrLszFmrBrPJbsBh2vMsefz5gnDpB5M1P6/go-ipld-format"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18
)

19 20 21 22 23 24 25 26 27
// TODO: We should move these registrations elsewhere. Really, most of the IPLD
// functionality should go in a `go-ipld` repo but that will take a lot of work
// and design.
func init() {
	node.Register(cid.DagProtobuf, DecodeProtobufBlock)
	node.Register(cid.Raw, DecodeRawBlock)
	node.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
}

28
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
29

Jeromy's avatar
Jeromy committed
30 31
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
32 33 34
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
35

Arthur Elliott's avatar
Arthur Elliott committed
36 37
	// GetMany returns a channel of NodeOption given
	// a set of CIDs
Jeromy's avatar
Jeromy committed
38
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
39 40

	Batch() *Batch
41 42

	LinkService
Jeromy's avatar
Jeromy committed
43 44
}

45
type LinkService interface {
46 47 48 49
	// GetLinks return all links for a node.  The complete node does not
	// necessarily have to exist locally, or at all.  For example, raw
	// leaves cannot possibly have links so there is no need to look
	// at the node.
Jeromy's avatar
Jeromy committed
50
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
51 52

	GetOfflineLinkService() LinkService
53 54
}

55
func NewDAGService(bs bserv.BlockService) *dagService {
56
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
57 58
}

59
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
60 61
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
62 63
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
64
type dagService struct {
65
	Blocks bserv.BlockService
66 67
}

68
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
69
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
70
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
71
		return nil, fmt.Errorf("dagService is nil")
72 73
	}

74
	return n.Blocks.AddBlock(nd)
75 76
}

77
func (n *dagService) Batch() *Batch {
78 79 80 81 82 83 84 85 86
	return &Batch{
		ds:      n,
		MaxSize: 8 << 20,

		// By default, only batch up to 128 nodes at a time.
		// The current implementation of flatfs opens this many file
		// descriptors at the same time for the optimized batch write.
		MaxBlocks: 128,
	}
87 88
}

89
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
90
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
91
	if n == nil {
92
		return nil, fmt.Errorf("dagService is nil")
93
	}
Jeromy's avatar
Jeromy committed
94

95 96
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
97

Jeromy's avatar
Jeromy committed
98
	b, err := n.Blocks.GetBlock(ctx, c)
99
	if err != nil {
100 101 102
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
103
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
104 105
	}

106
	return node.Decode(b)
107
}
Jeromy's avatar
Jeromy committed
108

109 110
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
Jeromy's avatar
Jeromy committed
111
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
112 113 114
	if c.Type() == cid.Raw {
		return nil, nil
	}
115 116 117 118
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
119
	return node.Links(), nil
120 121
}

122
func (n *dagService) GetOfflineLinkService() LinkService {
123 124
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
125 126 127 128 129 130
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
131
func (n *dagService) Remove(nd node.Node) error {
132
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
133 134
}

135 136 137
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
Jeromy's avatar
Jeromy committed
138
func GetLinksDirect(serv node.NodeGetter) GetLinks {
139 140 141
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
		node, err := serv.Get(ctx, c)
		if err != nil {
Jeromy's avatar
Jeromy committed
142 143 144
			if err == bserv.ErrNotFound {
				err = ErrNotFound
			}
145 146 147 148 149 150
			return nil, err
		}
		return node.Links(), nil
	}
}

151 152 153 154 155 156 157 158 159 160
type sesGetter struct {
	bs *bserv.Session
}

func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
	blk, err := sg.bs.GetBlock(ctx, c)
	if err != nil {
		return nil, err
	}

161
	return node.Decode(blk)
162 163
}

164
// FetchGraph fetches all nodes that are children of the given node
165
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
166 167 168
	var ng node.NodeGetter = serv
	ds, ok := serv.(*dagService)
	if ok {
169
		ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
170 171
	}

172 173
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
174
		return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
175 176 177 178 179 180 181 182 183 184
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
185
	return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
Jeromy's avatar
Jeromy committed
186
}
187

Jeromy's avatar
Jeromy committed
188 189
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
190
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
191
	var out []int
Jeromy's avatar
Jeromy committed
192 193
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
194
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
195 196
		}
	}
Jeromy's avatar
Jeromy committed
197
	return out
Jeromy's avatar
Jeromy committed
198 199
}

200
type NodeOption struct {
Jeromy's avatar
Jeromy committed
201
	Node node.Node
202 203 204
	Err  error
}

Jeromy's avatar
Jeromy committed
205
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
206
	out := make(chan *NodeOption, len(keys))
207
	blocks := ds.Blocks.GetBlocks(ctx, keys)
208 209
	var count int

210 211 212 213 214 215
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
216
					if count != len(keys) {
217
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
218
					}
219 220
					return
				}
Jeromy's avatar
Jeromy committed
221

222
				nd, err := node.Decode(b)
223 224
				if err != nil {
					out <- &NodeOption{Err: err}
225 226
					return
				}
Jeromy's avatar
Jeromy committed
227

228
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
229 230
				count++

231
			case <-ctx.Done():
232
				out <- &NodeOption{Err: ctx.Err()}
233 234 235 236
				return
			}
		}
	}()
237
	return out
238 239
}

240
// GetDAG will fill out all of the links of the given Node.
241 242
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
243
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
244
	var cids []*cid.Cid
245 246
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
247 248
	}

Jeromy's avatar
Jeromy committed
249
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
250 251
}

Jeromy's avatar
Jeromy committed
252 253
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
254
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
255 256 257 258 259 260

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
261
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
262
	for i := range keys {
263
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
264 265
	}

266
	dedupedKeys := dedupeKeys(keys)
267
	go func() {
268 269 270
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

271
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
272

273
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
274
			select {
275
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
276
				if !ok {
Jeromy's avatar
Jeromy committed
277 278 279
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
280 281
					return
				}
Jeromy's avatar
Jeromy committed
282

283
				if opt.Err != nil {
284 285 286
					for _, p := range promises {
						p.Fail(opt.Err)
					}
287 288 289 290
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
291
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
292
				for _, i := range is {
293
					count++
294
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
295 296 297
				}
			case <-ctx.Done():
				return
298 299 300
			}
		}
	}()
Jeromy's avatar
Jeromy committed
301 302 303
	return promises
}

304
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
305 306 307 308
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
309
	}
Jeromy's avatar
Jeromy committed
310
	return set.Keys()
311 312
}

313
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
314
	return &nodePromise{
Jeromy's avatar
Jeromy committed
315
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
316
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
317
		err:  make(chan error, 1),
318
	}
Jeromy's avatar
Jeromy committed
319 320 321
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
322
	cache node.Node
323
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
324
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
325
	ctx   context.Context
Jeromy's avatar
Jeromy committed
326
	err   chan error
Jeromy's avatar
Jeromy committed
327 328
}

Jeromy's avatar
Jeromy committed
329 330 331 332
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
333
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
334
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
335
	Fail(err error)
Jeromy's avatar
Jeromy committed
336
	Send(node.Node)
Jeromy's avatar
Jeromy committed
337 338 339
}

func (np *nodePromise) Fail(err error) {
340 341 342 343 344 345 346 347 348
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
349
	np.err <- err
Jeromy's avatar
Jeromy committed
350 351
}

Jeromy's avatar
Jeromy committed
352
func (np *nodePromise) Send(nd node.Node) {
353 354
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
355
	if np.cache != nil {
356 357 358 359 360 361 362 363 364 365 366 367
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
368
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
369 370 371 372 373
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
374 375 376
	}

	select {
377 378
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
379 380
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
381 382
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
383 384
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
385
	}
386
}
387 388 389 390

type Batch struct {
	ds *dagService

391 392 393 394
	blocks    []blocks.Block
	size      int
	MaxSize   int
	MaxBlocks int
395 396
}

Jeromy's avatar
Jeromy committed
397
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
398
	t.blocks = append(t.blocks, nd)
399
	t.size += len(nd.RawData())
400
	if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
Jeromy's avatar
Jeromy committed
401
		return nd.Cid(), t.Commit()
402
	}
Jeromy's avatar
Jeromy committed
403
	return nd.Cid(), nil
404 405 406
}

func (t *Batch) Commit() error {
407 408
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
409 410 411
	t.size = 0
	return err
}
412

413 414
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

415 416 417
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
418 419 420
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
421 422
		return err
	}
423
	for _, lnk := range links {
424
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
425
		if visit(c) {
426
			err = EnumerateChildren(ctx, getLinks, c, visit)
427 428 429 430 431 432 433
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
434

435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

456 457 458
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
459

460
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
461
	feed := make(chan *cid.Cid)
462
	out := make(chan []*node.Link)
463 464 465
	done := make(chan struct{})

	var setlk sync.Mutex
466

467 468
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
469

470
	defer cancel()
471

472 473
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
474
			for ic := range feed {
475
				links, err := getLinks(ctx, ic)
476 477 478
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
479
				}
480

481 482 483
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
484

485
				if unseen {
486
					select {
487
					case out <- links:
488
					case <-fetchersCtx.Done():
489 490 491
						return
					}
				}
Jeromy's avatar
Jeromy committed
492
				select {
493
				case done <- struct{}{}:
494
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
495 496
				}
			}
497
		}()
Jeromy's avatar
Jeromy committed
498
	}
499
	defer close(feed)
Jeromy's avatar
Jeromy committed
500

501
	send := feed
502
	var todobuffer []*cid.Cid
503
	var inProgress int
Jeromy's avatar
Jeromy committed
504

505
	next := c
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
522 523
		case links := <-out:
			for _, lnk := range links {
524 525 526 527 528 529
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
530
			}
531 532
		case err := <-errChan:
			return err
533

534
		case <-ctx.Done():
535
			return ctx.Err()
536
		}
Jeromy's avatar
Jeromy committed
537
	}
538

Jeromy's avatar
Jeromy committed
539
}