merkledag.go 11.8 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

9
	bserv "github.com/ipfs/go-ipfs/blockservice"
10
	offline "github.com/ipfs/go-ipfs/exchange/offline"
11
	blocks "gx/ipfs/QmVA4mafxbfH5aEvNz8fyoxC6J1xhAtw88B4GerPznSZBg/go-block-format"
Jeromy's avatar
Jeromy committed
12

13 14 15
	cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
	node "gx/ipfs/QmYNyRZJBUYPNrLszFmrBrPJbsBh2vMsefz5gnDpB5M1P6/go-ipld-format"
	ipldcbor "gx/ipfs/QmemYymP73eVdTUUMZEiSpiHeZQKNJdT5dP2iuHssZh1sR/go-ipld-cbor"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

18 19 20 21 22 23 24 25 26
// TODO: We should move these registrations elsewhere. Really, most of the IPLD
// functionality should go in a `go-ipld` repo but that will take a lot of work
// and design.
func init() {
	node.Register(cid.DagProtobuf, DecodeProtobufBlock)
	node.Register(cid.Raw, DecodeRawBlock)
	node.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
}

27
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
28

Jeromy's avatar
Jeromy committed
29 30
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
31 32 33
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
34 35 36

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
37
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
38 39

	Batch() *Batch
40 41

	LinkService
Jeromy's avatar
Jeromy committed
42 43
}

44
type LinkService interface {
45 46 47 48
	// GetLinks return all links for a node.  The complete node does not
	// necessarily have to exist locally, or at all.  For example, raw
	// leaves cannot possibly have links so there is no need to look
	// at the node.
Jeromy's avatar
Jeromy committed
49
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
50 51

	GetOfflineLinkService() LinkService
52 53
}

54
func NewDAGService(bs bserv.BlockService) *dagService {
55
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
56 57
}

58
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
59 60
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
61 62
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
63
type dagService struct {
64
	Blocks bserv.BlockService
65 66
}

67
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
68
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
69
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
70
		return nil, fmt.Errorf("dagService is nil")
71 72
	}

73
	return n.Blocks.AddBlock(nd)
74 75
}

76
func (n *dagService) Batch() *Batch {
77 78 79 80 81 82 83 84 85
	return &Batch{
		ds:      n,
		MaxSize: 8 << 20,

		// By default, only batch up to 128 nodes at a time.
		// The current implementation of flatfs opens this many file
		// descriptors at the same time for the optimized batch write.
		MaxBlocks: 128,
	}
86 87
}

88
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
89
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
90
	if n == nil {
91
		return nil, fmt.Errorf("dagService is nil")
92
	}
Jeromy's avatar
Jeromy committed
93

94 95
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
96

Jeromy's avatar
Jeromy committed
97
	b, err := n.Blocks.GetBlock(ctx, c)
98
	if err != nil {
99 100 101
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
102
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
103 104
	}

105
	return node.Decode(b)
106
}
Jeromy's avatar
Jeromy committed
107

108 109
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
Jeromy's avatar
Jeromy committed
110
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
111 112 113
	if c.Type() == cid.Raw {
		return nil, nil
	}
114 115 116 117
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
118
	return node.Links(), nil
119 120
}

121
func (n *dagService) GetOfflineLinkService() LinkService {
122 123
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
124 125 126 127 128 129
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
130
func (n *dagService) Remove(nd node.Node) error {
131
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
132 133
}

134 135 136
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
Jeromy's avatar
Jeromy committed
137
func GetLinksDirect(serv node.NodeGetter) GetLinks {
138 139 140
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
		node, err := serv.Get(ctx, c)
		if err != nil {
Jeromy's avatar
Jeromy committed
141 142 143
			if err == bserv.ErrNotFound {
				err = ErrNotFound
			}
144 145 146 147 148 149
			return nil, err
		}
		return node.Links(), nil
	}
}

150 151 152 153 154 155 156 157 158 159
type sesGetter struct {
	bs *bserv.Session
}

func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
	blk, err := sg.bs.GetBlock(ctx, c)
	if err != nil {
		return nil, err
	}

160
	return node.Decode(blk)
161 162
}

163
// FetchGraph fetches all nodes that are children of the given node
164
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
165 166 167
	var ng node.NodeGetter = serv
	ds, ok := serv.(*dagService)
	if ok {
168
		ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
169 170
	}

171 172
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
173
		return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
174 175 176 177 178 179 180 181 182 183
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
184
	return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
Jeromy's avatar
Jeromy committed
185
}
186

Jeromy's avatar
Jeromy committed
187 188
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
189
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
190
	var out []int
Jeromy's avatar
Jeromy committed
191 192
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
193
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
194 195
		}
	}
Jeromy's avatar
Jeromy committed
196
	return out
Jeromy's avatar
Jeromy committed
197 198
}

199
type NodeOption struct {
Jeromy's avatar
Jeromy committed
200
	Node node.Node
201 202 203
	Err  error
}

Jeromy's avatar
Jeromy committed
204
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
205
	out := make(chan *NodeOption, len(keys))
206
	blocks := ds.Blocks.GetBlocks(ctx, keys)
207 208
	var count int

209 210 211 212 213 214
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
215
					if count != len(keys) {
216
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
217
					}
218 219
					return
				}
Jeromy's avatar
Jeromy committed
220

221
				nd, err := node.Decode(b)
222 223
				if err != nil {
					out <- &NodeOption{Err: err}
224 225
					return
				}
Jeromy's avatar
Jeromy committed
226

227
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
228 229
				count++

230
			case <-ctx.Done():
231
				out <- &NodeOption{Err: ctx.Err()}
232 233 234 235
				return
			}
		}
	}()
236
	return out
237 238
}

239
// GetDAG will fill out all of the links of the given Node.
240 241
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
242
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
243
	var cids []*cid.Cid
244 245
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
246 247
	}

Jeromy's avatar
Jeromy committed
248
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
249 250
}

Jeromy's avatar
Jeromy committed
251 252
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
253
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
254 255 256 257 258 259

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
260
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
261
	for i := range keys {
262
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
263 264
	}

265
	dedupedKeys := dedupeKeys(keys)
266
	go func() {
267 268 269
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

270
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
271

272
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
273
			select {
274
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
275
				if !ok {
Jeromy's avatar
Jeromy committed
276 277 278
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
279 280
					return
				}
Jeromy's avatar
Jeromy committed
281

282
				if opt.Err != nil {
283 284 285
					for _, p := range promises {
						p.Fail(opt.Err)
					}
286 287 288 289
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
290
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
291
				for _, i := range is {
292
					count++
293
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
294 295 296
				}
			case <-ctx.Done():
				return
297 298 299
			}
		}
	}()
Jeromy's avatar
Jeromy committed
300 301 302
	return promises
}

303
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
304 305 306 307
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
308
	}
Jeromy's avatar
Jeromy committed
309
	return set.Keys()
310 311
}

312
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
313
	return &nodePromise{
Jeromy's avatar
Jeromy committed
314
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
315
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
316
		err:  make(chan error, 1),
317
	}
Jeromy's avatar
Jeromy committed
318 319 320
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
321
	cache node.Node
322
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
323
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
324
	ctx   context.Context
Jeromy's avatar
Jeromy committed
325
	err   chan error
Jeromy's avatar
Jeromy committed
326 327
}

Jeromy's avatar
Jeromy committed
328 329 330 331
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
332
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
333
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
334
	Fail(err error)
Jeromy's avatar
Jeromy committed
335
	Send(node.Node)
Jeromy's avatar
Jeromy committed
336 337 338
}

func (np *nodePromise) Fail(err error) {
339 340 341 342 343 344 345 346 347
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
348
	np.err <- err
Jeromy's avatar
Jeromy committed
349 350
}

Jeromy's avatar
Jeromy committed
351
func (np *nodePromise) Send(nd node.Node) {
352 353
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
354
	if np.cache != nil {
355 356 357 358 359 360 361 362 363 364 365 366
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
367
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
368 369 370 371 372
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
373 374 375
	}

	select {
376 377
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
378 379
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
380 381
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
382 383
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
384
	}
385
}
386 387 388 389

type Batch struct {
	ds *dagService

390 391 392 393
	blocks    []blocks.Block
	size      int
	MaxSize   int
	MaxBlocks int
394 395
}

Jeromy's avatar
Jeromy committed
396
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
397
	t.blocks = append(t.blocks, nd)
398
	t.size += len(nd.RawData())
399
	if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
Jeromy's avatar
Jeromy committed
400
		return nd.Cid(), t.Commit()
401
	}
Jeromy's avatar
Jeromy committed
402
	return nd.Cid(), nil
403 404 405
}

func (t *Batch) Commit() error {
406 407
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
408 409 410
	t.size = 0
	return err
}
411

412 413
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

414 415 416
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
417 418 419
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
420 421
		return err
	}
422
	for _, lnk := range links {
423
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
424
		if visit(c) {
425
			err = EnumerateChildren(ctx, getLinks, c, visit)
426 427 428 429 430 431 432
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
433

434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

455 456 457
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
458

459
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
460
	feed := make(chan *cid.Cid)
461
	out := make(chan []*node.Link)
462 463 464
	done := make(chan struct{})

	var setlk sync.Mutex
465

466 467
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
468

469
	defer cancel()
470

471 472
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
473
			for ic := range feed {
474
				links, err := getLinks(ctx, ic)
475 476 477
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
478
				}
479

480 481 482
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
483

484
				if unseen {
485
					select {
486
					case out <- links:
487
					case <-fetchersCtx.Done():
488 489 490
						return
					}
				}
Jeromy's avatar
Jeromy committed
491
				select {
492
				case done <- struct{}{}:
493
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
494 495
				}
			}
496
		}()
Jeromy's avatar
Jeromy committed
497
	}
498
	defer close(feed)
Jeromy's avatar
Jeromy committed
499

500
	send := feed
501
	var todobuffer []*cid.Cid
502
	var inProgress int
Jeromy's avatar
Jeromy committed
503

504
	next := c
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
521 522
		case links := <-out:
			for _, lnk := range links {
523 524 525 526 527 528
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
529
			}
530 531
		case err := <-errChan:
			return err
532

533
		case <-ctx.Done():
534
			return ctx.Err()
535
		}
Jeromy's avatar
Jeromy committed
536
	}
537

Jeromy's avatar
Jeromy committed
538
}