merkledag.go 11.8 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

9
	bserv "github.com/ipfs/go-ipfs/blockservice"
10
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
11

12
	cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
Arthur Elliott's avatar
Arthur Elliott committed
13
	blocks "gx/ipfs/QmVA4mafxbfH5aEvNz8fyoxC6J1xhAtw88B4GerPznSZBg/go-block-format"
14
	node "gx/ipfs/QmYNyRZJBUYPNrLszFmrBrPJbsBh2vMsefz5gnDpB5M1P6/go-ipld-format"
15
	ipldcbor "gx/ipfs/QmeebqVZeEXBqJ2B4urQWfdhwRRPm84ajnCo8x8pfwbsPM/go-ipld-cbor"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

18 19 20 21 22 23 24 25 26
// TODO: We should move these registrations elsewhere. Really, most of the IPLD
// functionality should go in a `go-ipld` repo but that will take a lot of work
// and design.
func init() {
	node.Register(cid.DagProtobuf, DecodeProtobufBlock)
	node.Register(cid.Raw, DecodeRawBlock)
	node.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
}

27
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
28

Jeromy's avatar
Jeromy committed
29 30
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
31 32 33
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
34

Arthur Elliott's avatar
Arthur Elliott committed
35
	// GetMany returns a channel of NodeOption given
Arthur Elliott's avatar
Arthur Elliott committed
36
	// a set of CIDs.
Jeromy's avatar
Jeromy committed
37
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
38 39

	Batch() *Batch
40 41

	LinkService
Jeromy's avatar
Jeromy committed
42 43
}

44
type LinkService interface {
45 46 47 48
	// GetLinks return all links for a node.  The complete node does not
	// necessarily have to exist locally, or at all.  For example, raw
	// leaves cannot possibly have links so there is no need to look
	// at the node.
Jeromy's avatar
Jeromy committed
49
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
50 51

	GetOfflineLinkService() LinkService
52 53
}

54
func NewDAGService(bs bserv.BlockService) *dagService {
55
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
56 57
}

58
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
59 60
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
61 62
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
63
type dagService struct {
64
	Blocks bserv.BlockService
65 66
}

67
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
68
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
69
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
70
		return nil, fmt.Errorf("dagService is nil")
71 72
	}

73
	return n.Blocks.AddBlock(nd)
74 75
}

76
func (n *dagService) Batch() *Batch {
77 78 79 80 81 82 83 84 85
	return &Batch{
		ds:      n,
		MaxSize: 8 << 20,

		// By default, only batch up to 128 nodes at a time.
		// The current implementation of flatfs opens this many file
		// descriptors at the same time for the optimized batch write.
		MaxBlocks: 128,
	}
86 87
}

88
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
89
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
90
	if n == nil {
91
		return nil, fmt.Errorf("dagService is nil")
92
	}
Jeromy's avatar
Jeromy committed
93

94 95
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
96

Jeromy's avatar
Jeromy committed
97
	b, err := n.Blocks.GetBlock(ctx, c)
98
	if err != nil {
99 100 101
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
102
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
103 104
	}

105
	return node.Decode(b)
106
}
Jeromy's avatar
Jeromy committed
107

108 109
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
Jeromy's avatar
Jeromy committed
110
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
111 112 113
	if c.Type() == cid.Raw {
		return nil, nil
	}
114 115 116 117
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
118
	return node.Links(), nil
119 120
}

121
func (n *dagService) GetOfflineLinkService() LinkService {
122 123
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
124 125 126 127 128 129
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
130
func (n *dagService) Remove(nd node.Node) error {
131
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
132 133
}

134 135 136
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
Jeromy's avatar
Jeromy committed
137
func GetLinksDirect(serv node.NodeGetter) GetLinks {
138 139 140
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
		node, err := serv.Get(ctx, c)
		if err != nil {
Jeromy's avatar
Jeromy committed
141 142 143
			if err == bserv.ErrNotFound {
				err = ErrNotFound
			}
144 145 146 147 148 149
			return nil, err
		}
		return node.Links(), nil
	}
}

150 151 152 153 154 155
type sesGetter struct {
	bs *bserv.Session
}

func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
	blk, err := sg.bs.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
156 157 158 159
	switch err {
	case bserv.ErrNotFound:
		return nil, ErrNotFound
	default:
160 161 162
		return nil, err
	}

163
	return node.Decode(blk)
164 165
}

166
// FetchGraph fetches all nodes that are children of the given node
167
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
168 169 170
	var ng node.NodeGetter = serv
	ds, ok := serv.(*dagService)
	if ok {
171
		ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
172 173
	}

174 175
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
176
		return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
177 178 179 180 181 182 183 184 185 186
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
187
	return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
Jeromy's avatar
Jeromy committed
188
}
189

Jeromy's avatar
Jeromy committed
190 191
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
192
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
193
	var out []int
Jeromy's avatar
Jeromy committed
194 195
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
196
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
197 198
		}
	}
Jeromy's avatar
Jeromy committed
199
	return out
Jeromy's avatar
Jeromy committed
200 201
}

202
type NodeOption struct {
Jeromy's avatar
Jeromy committed
203
	Node node.Node
204 205 206
	Err  error
}

Jeromy's avatar
Jeromy committed
207
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
208
	out := make(chan *NodeOption, len(keys))
209
	blocks := ds.Blocks.GetBlocks(ctx, keys)
210 211
	var count int

212 213 214 215 216 217
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
218
					if count != len(keys) {
219
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
220
					}
221 222
					return
				}
Jeromy's avatar
Jeromy committed
223

224
				nd, err := node.Decode(b)
225 226
				if err != nil {
					out <- &NodeOption{Err: err}
227 228
					return
				}
Jeromy's avatar
Jeromy committed
229

230
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
231 232
				count++

233
			case <-ctx.Done():
234
				out <- &NodeOption{Err: ctx.Err()}
235 236 237 238
				return
			}
		}
	}()
239
	return out
240 241
}

242
// GetDAG will fill out all of the links of the given Node.
243 244
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
245
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
246
	var cids []*cid.Cid
247 248
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
249 250
	}

Jeromy's avatar
Jeromy committed
251
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
252 253
}

Jeromy's avatar
Jeromy committed
254 255
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
256
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
257 258 259 260 261 262

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
263
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
264
	for i := range keys {
265
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
266 267
	}

268
	dedupedKeys := dedupeKeys(keys)
269
	go func() {
270 271 272
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

273
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
274

275
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
276
			select {
277
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
278
				if !ok {
Jeromy's avatar
Jeromy committed
279 280 281
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
282 283
					return
				}
Jeromy's avatar
Jeromy committed
284

285
				if opt.Err != nil {
286 287 288
					for _, p := range promises {
						p.Fail(opt.Err)
					}
289 290 291 292
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
293
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
294
				for _, i := range is {
295
					count++
296
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
297 298 299
				}
			case <-ctx.Done():
				return
300 301 302
			}
		}
	}()
Jeromy's avatar
Jeromy committed
303 304 305
	return promises
}

306
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
307 308 309 310
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
311
	}
Jeromy's avatar
Jeromy committed
312
	return set.Keys()
313 314
}

315
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
316
	return &nodePromise{
Jeromy's avatar
Jeromy committed
317
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
318
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
319
		err:  make(chan error, 1),
320
	}
Jeromy's avatar
Jeromy committed
321 322 323
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
324
	cache node.Node
325
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
326
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
327
	ctx   context.Context
Jeromy's avatar
Jeromy committed
328
	err   chan error
Jeromy's avatar
Jeromy committed
329 330
}

Jeromy's avatar
Jeromy committed
331 332 333 334
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
335
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
336
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
337
	Fail(err error)
Jeromy's avatar
Jeromy committed
338
	Send(node.Node)
Jeromy's avatar
Jeromy committed
339 340 341
}

func (np *nodePromise) Fail(err error) {
342 343 344 345 346 347 348 349 350
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
351
	np.err <- err
Jeromy's avatar
Jeromy committed
352 353
}

Jeromy's avatar
Jeromy committed
354
func (np *nodePromise) Send(nd node.Node) {
355 356
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
357
	if np.cache != nil {
358 359 360 361 362 363 364 365 366 367 368 369
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
370
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
371 372 373 374 375
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
376 377 378
	}

	select {
379 380
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
381 382
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
383 384
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
385 386
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
387
	}
388
}
389 390 391 392

type Batch struct {
	ds *dagService

393 394 395 396
	blocks    []blocks.Block
	size      int
	MaxSize   int
	MaxBlocks int
397 398
}

Jeromy's avatar
Jeromy committed
399
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
400
	t.blocks = append(t.blocks, nd)
401
	t.size += len(nd.RawData())
402
	if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
Jeromy's avatar
Jeromy committed
403
		return nd.Cid(), t.Commit()
404
	}
Jeromy's avatar
Jeromy committed
405
	return nd.Cid(), nil
406 407 408
}

func (t *Batch) Commit() error {
409 410
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
411 412 413
	t.size = 0
	return err
}
414

415 416
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

417 418 419
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
420 421 422
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
423 424
		return err
	}
425
	for _, lnk := range links {
426
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
427
		if visit(c) {
428
			err = EnumerateChildren(ctx, getLinks, c, visit)
429 430 431 432 433 434 435
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
436

437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

458 459 460
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
461

462
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
463
	feed := make(chan *cid.Cid)
464
	out := make(chan []*node.Link)
465 466 467
	done := make(chan struct{})

	var setlk sync.Mutex
468

469 470
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
471

472
	defer cancel()
473

474 475
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
476
			for ic := range feed {
477
				links, err := getLinks(ctx, ic)
478 479 480
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
481
				}
482

483 484 485
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
486

487
				if unseen {
488
					select {
489
					case out <- links:
490
					case <-fetchersCtx.Done():
491 492 493
						return
					}
				}
Jeromy's avatar
Jeromy committed
494
				select {
495
				case done <- struct{}{}:
496
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
497 498
				}
			}
499
		}()
Jeromy's avatar
Jeromy committed
500
	}
501
	defer close(feed)
Jeromy's avatar
Jeromy committed
502

503
	send := feed
504
	var todobuffer []*cid.Cid
505
	var inProgress int
Jeromy's avatar
Jeromy committed
506

507
	next := c
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
524 525
		case links := <-out:
			for _, lnk := range links {
526 527 528 529 530 531
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
532
			}
533 534
		case err := <-errChan:
			return err
535

536
		case <-ctx.Done():
537
			return ctx.Err()
538
		}
Jeromy's avatar
Jeromy committed
539
	}
540

Jeromy's avatar
Jeromy committed
541
}