merkledag.go 11.8 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"strings"
8
	"sync"
Jeromy's avatar
Jeromy committed
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
12
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
13

14
	ipldcbor "gx/ipfs/QmNrbCt8j9DT5W9Pmjy2SdudT9k8GpaDr4sRuFix3BXhgR/go-ipld-cbor"
Jeromy's avatar
Jeromy committed
15
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
16 17
	cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
	node "gx/ipfs/Qmb3Hm9QDFmfYuET4pu7Kyg8JV78jFa1nvZx5vnCZsK4ck/go-ipld-format"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("merkledag")
21
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
22

Jeromy's avatar
Jeromy committed
23 24
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
25 26 27
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
28 29 30

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
31
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
32 33

	Batch() *Batch
34 35

	LinkService
Jeromy's avatar
Jeromy committed
36 37
}

38
type LinkService interface {
39 40 41 42
	// GetLinks return all links for a node.  The complete node does not
	// necessarily have to exist locally, or at all.  For example, raw
	// leaves cannot possibly have links so there is no need to look
	// at the node.
Jeromy's avatar
Jeromy committed
43
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
44 45

	GetOfflineLinkService() LinkService
46 47
}

48
func NewDAGService(bs bserv.BlockService) *dagService {
49
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
50 51
}

52
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
53 54
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
55 56
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
57
type dagService struct {
58
	Blocks bserv.BlockService
59 60
}

61
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
62
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
63
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
64
		return nil, fmt.Errorf("dagService is nil")
65 66
	}

67
	return n.Blocks.AddBlock(nd)
68 69
}

70
func (n *dagService) Batch() *Batch {
71 72 73 74 75 76 77 78 79
	return &Batch{
		ds:      n,
		MaxSize: 8 << 20,

		// By default, only batch up to 128 nodes at a time.
		// The current implementation of flatfs opens this many file
		// descriptors at the same time for the optimized batch write.
		MaxBlocks: 128,
	}
80 81
}

82
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
83
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
84
	if n == nil {
85
		return nil, fmt.Errorf("dagService is nil")
86
	}
Jeromy's avatar
Jeromy committed
87

88 89
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
90

Jeromy's avatar
Jeromy committed
91
	b, err := n.Blocks.GetBlock(ctx, c)
92
	if err != nil {
93 94 95
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
96
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
97 98
	}

99 100 101 102 103 104
	return decodeBlock(b)
}

func decodeBlock(b blocks.Block) (node.Node, error) {
	c := b.Cid()

Jeromy's avatar
Jeromy committed
105
	switch c.Type() {
Jeromy's avatar
Jeromy committed
106
	case cid.DagProtobuf:
107
		decnd, err := DecodeProtobuf(b.RawData())
Jeromy's avatar
Jeromy committed
108 109 110 111 112
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
113
		}
114 115

		decnd.cached = b.Cid()
116
		decnd.Prefix = b.Cid().Prefix()
117 118
		return decnd, nil
	case cid.Raw:
119
		return NewRawNodeWPrefix(b.RawData(), b.Cid().Prefix())
Jeromy's avatar
Jeromy committed
120
	case cid.DagCBOR:
121
		return ipldcbor.Decode(b.RawData())
Jeromy's avatar
Jeromy committed
122
	default:
123
		return nil, fmt.Errorf("unrecognized object type: %s", c.Type())
124
	}
125
}
Jeromy's avatar
Jeromy committed
126

127 128
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
Jeromy's avatar
Jeromy committed
129
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
130 131 132
	if c.Type() == cid.Raw {
		return nil, nil
	}
133 134 135 136
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
137
	return node.Links(), nil
138 139
}

140
func (n *dagService) GetOfflineLinkService() LinkService {
141 142
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
143 144 145 146 147 148
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
149
func (n *dagService) Remove(nd node.Node) error {
150
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
151 152
}

153 154 155
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
156 157 158 159 160 161 162 163 164 165
func GetLinksDirect(serv DAGService) GetLinks {
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
		node, err := serv.Get(ctx, c)
		if err != nil {
			return nil, err
		}
		return node.Links(), nil
	}
}

166
// FetchGraph fetches all nodes that are children of the given node
167 168 169
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
170
		return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit)
171 172 173 174 175 176 177 178 179 180
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
181
	return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, visit)
Jeromy's avatar
Jeromy committed
182
}
183

Jeromy's avatar
Jeromy committed
184 185
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
186
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
187
	var out []int
Jeromy's avatar
Jeromy committed
188 189
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
190
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
191 192
		}
	}
Jeromy's avatar
Jeromy committed
193
	return out
Jeromy's avatar
Jeromy committed
194 195
}

196
type NodeOption struct {
Jeromy's avatar
Jeromy committed
197
	Node node.Node
198 199 200
	Err  error
}

Jeromy's avatar
Jeromy committed
201
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
202
	out := make(chan *NodeOption, len(keys))
203
	blocks := ds.Blocks.GetBlocks(ctx, keys)
204 205
	var count int

206 207 208 209 210 211
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
212
					if count != len(keys) {
213
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
214
					}
215 216
					return
				}
Jeromy's avatar
Jeromy committed
217

218 219 220
				nd, err := decodeBlock(b)
				if err != nil {
					out <- &NodeOption{Err: err}
221 222
					return
				}
Jeromy's avatar
Jeromy committed
223

224
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
225 226
				count++

227
			case <-ctx.Done():
228
				out <- &NodeOption{Err: ctx.Err()}
229 230 231 232
				return
			}
		}
	}()
233
	return out
234 235
}

236
// GetDAG will fill out all of the links of the given Node.
237 238
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
239
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
240
	var cids []*cid.Cid
241 242
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
243 244
	}

Jeromy's avatar
Jeromy committed
245
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
246 247
}

Jeromy's avatar
Jeromy committed
248 249
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
250
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
251 252 253 254 255 256

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
257
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
258
	for i := range keys {
259
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
260 261
	}

262
	dedupedKeys := dedupeKeys(keys)
263
	go func() {
264 265 266
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

267
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
268

269
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
270
			select {
271
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
272
				if !ok {
Jeromy's avatar
Jeromy committed
273 274 275
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
276 277
					return
				}
Jeromy's avatar
Jeromy committed
278

279
				if opt.Err != nil {
280 281 282
					for _, p := range promises {
						p.Fail(opt.Err)
					}
283 284 285 286
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
287
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
288
				for _, i := range is {
289
					count++
290
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
291 292 293
				}
			case <-ctx.Done():
				return
294 295 296
			}
		}
	}()
Jeromy's avatar
Jeromy committed
297 298 299
	return promises
}

300
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
301 302 303 304
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
305
	}
Jeromy's avatar
Jeromy committed
306
	return set.Keys()
307 308
}

309
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
310
	return &nodePromise{
Jeromy's avatar
Jeromy committed
311
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
312
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
313
		err:  make(chan error, 1),
314
	}
Jeromy's avatar
Jeromy committed
315 316 317
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
318
	cache node.Node
319
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
320
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
321
	ctx   context.Context
Jeromy's avatar
Jeromy committed
322
	err   chan error
Jeromy's avatar
Jeromy committed
323 324
}

Jeromy's avatar
Jeromy committed
325 326 327 328
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
329
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
330
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
331
	Fail(err error)
Jeromy's avatar
Jeromy committed
332
	Send(node.Node)
Jeromy's avatar
Jeromy committed
333 334 335
}

func (np *nodePromise) Fail(err error) {
336 337 338 339 340 341 342 343 344
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
345
	np.err <- err
Jeromy's avatar
Jeromy committed
346 347
}

Jeromy's avatar
Jeromy committed
348
func (np *nodePromise) Send(nd node.Node) {
349 350
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
351
	if np.cache != nil {
352 353 354 355 356 357 358 359 360 361 362 363
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
364
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
365 366 367 368 369
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
370 371 372
	}

	select {
373 374
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
375 376
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
377 378
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
379 380
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
381
	}
382
}
383 384 385 386

type Batch struct {
	ds *dagService

387 388 389 390
	blocks    []blocks.Block
	size      int
	MaxSize   int
	MaxBlocks int
391 392
}

Jeromy's avatar
Jeromy committed
393
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
394
	t.blocks = append(t.blocks, nd)
395
	t.size += len(nd.RawData())
396
	if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
Jeromy's avatar
Jeromy committed
397
		return nd.Cid(), t.Commit()
398
	}
Jeromy's avatar
Jeromy committed
399
	return nd.Cid(), nil
400 401 402
}

func (t *Batch) Commit() error {
403 404
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
405 406 407
	t.size = 0
	return err
}
408

409 410
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

411 412 413
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
414 415 416
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
417 418
		return err
	}
419
	for _, lnk := range links {
420
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
421
		if visit(c) {
422
			err = EnumerateChildren(ctx, getLinks, c, visit)
423 424 425 426 427 428 429
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
430

431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

452 453 454
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
455

456
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
457
	feed := make(chan *cid.Cid)
458
	out := make(chan []*node.Link)
459 460 461
	done := make(chan struct{})

	var setlk sync.Mutex
462

463 464
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
465

466
	defer cancel()
467

468 469
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
470
			for ic := range feed {
471
				links, err := getLinks(ctx, ic)
472 473 474
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
475
				}
476

477 478 479
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
480

481
				if unseen {
482
					select {
483
					case out <- links:
484
					case <-fetchersCtx.Done():
485 486 487
						return
					}
				}
Jeromy's avatar
Jeromy committed
488
				select {
489
				case done <- struct{}{}:
490
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
491 492
				}
			}
493
		}()
Jeromy's avatar
Jeromy committed
494
	}
495
	defer close(feed)
Jeromy's avatar
Jeromy committed
496

497
	send := feed
498
	var todobuffer []*cid.Cid
499
	var inProgress int
Jeromy's avatar
Jeromy committed
500

501
	next := c
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
518 519
		case links := <-out:
			for _, lnk := range links {
520 521 522 523 524 525
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
526
			}
527 528
		case err := <-errChan:
			return err
529

530
		case <-ctx.Done():
531
			return ctx.Err()
532
		}
Jeromy's avatar
Jeromy committed
533
	}
534

Jeromy's avatar
Jeromy committed
535
}