merkledag.go 12.1 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"strings"
8
	"sync"
Jeromy's avatar
Jeromy committed
9

10
	bserv "github.com/ipfs/go-ipfs/blockservice"
11
	offline "github.com/ipfs/go-ipfs/exchange/offline"
12
	blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
Jeromy's avatar
Jeromy committed
13

14 15 16
	node "gx/ipfs/QmPAKbSsgEX5B6fpmxa61jXYnoWzZr5sNafd3qgPiSH8Uv/go-ipld-format"
	cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
	ipldcbor "gx/ipfs/Qmcdid3XrCxcoNQUqZKiiKtM7JXxtyipU3izyRqwjFbVWw/go-ipld-cbor"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18
)

19
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
20

Jeromy's avatar
Jeromy committed
21 22
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
23 24 25
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
26 27 28

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
29
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
30 31

	Batch() *Batch
32 33

	LinkService
Jeromy's avatar
Jeromy committed
34 35
}

36
type LinkService interface {
37 38 39 40
	// GetLinks return all links for a node.  The complete node does not
	// necessarily have to exist locally, or at all.  For example, raw
	// leaves cannot possibly have links so there is no need to look
	// at the node.
Jeromy's avatar
Jeromy committed
41
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
42 43

	GetOfflineLinkService() LinkService
44 45
}

46
func NewDAGService(bs bserv.BlockService) *dagService {
47
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
48 49
}

50
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
51 52
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
53 54
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
55
type dagService struct {
56
	Blocks bserv.BlockService
57 58
}

59
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
60
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
61
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
62
		return nil, fmt.Errorf("dagService is nil")
63 64
	}

65
	return n.Blocks.AddBlock(nd)
66 67
}

68
func (n *dagService) Batch() *Batch {
69 70 71 72 73 74 75 76 77
	return &Batch{
		ds:      n,
		MaxSize: 8 << 20,

		// By default, only batch up to 128 nodes at a time.
		// The current implementation of flatfs opens this many file
		// descriptors at the same time for the optimized batch write.
		MaxBlocks: 128,
	}
78 79
}

80
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
81
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
82
	if n == nil {
83
		return nil, fmt.Errorf("dagService is nil")
84
	}
Jeromy's avatar
Jeromy committed
85

86 87
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
88

Jeromy's avatar
Jeromy committed
89
	b, err := n.Blocks.GetBlock(ctx, c)
90
	if err != nil {
91 92 93
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
94
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
95 96
	}

97 98 99 100 101 102
	return decodeBlock(b)
}

func decodeBlock(b blocks.Block) (node.Node, error) {
	c := b.Cid()

Jeromy's avatar
Jeromy committed
103
	switch c.Type() {
Jeromy's avatar
Jeromy committed
104
	case cid.DagProtobuf:
105
		decnd, err := DecodeProtobuf(b.RawData())
Jeromy's avatar
Jeromy committed
106 107 108 109 110
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
111
		}
112 113

		decnd.cached = b.Cid()
114
		decnd.Prefix = b.Cid().Prefix()
115 116
		return decnd, nil
	case cid.Raw:
117
		return NewRawNodeWPrefix(b.RawData(), b.Cid().Prefix())
Jeromy's avatar
Jeromy committed
118
	case cid.DagCBOR:
119
		return ipldcbor.Decode(b.RawData())
Jeromy's avatar
Jeromy committed
120
	default:
121
		return nil, fmt.Errorf("unrecognized object type: %s", c.Type())
122
	}
123
}
Jeromy's avatar
Jeromy committed
124

125 126
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
Jeromy's avatar
Jeromy committed
127
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
128 129 130
	if c.Type() == cid.Raw {
		return nil, nil
	}
131 132 133 134
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
135
	return node.Links(), nil
136 137
}

138
func (n *dagService) GetOfflineLinkService() LinkService {
139 140
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
141 142 143 144 145 146
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
147
func (n *dagService) Remove(nd node.Node) error {
148
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
149 150
}

151 152 153
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
Jeromy's avatar
Jeromy committed
154
func GetLinksDirect(serv node.NodeGetter) GetLinks {
155 156 157 158 159 160 161 162 163
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
		node, err := serv.Get(ctx, c)
		if err != nil {
			return nil, err
		}
		return node.Links(), nil
	}
}

164 165 166 167 168 169 170 171 172 173 174 175 176
type sesGetter struct {
	bs *bserv.Session
}

func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
	blk, err := sg.bs.GetBlock(ctx, c)
	if err != nil {
		return nil, err
	}

	return decodeBlock(blk)
}

177
// FetchGraph fetches all nodes that are children of the given node
178
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
179 180 181 182 183 184
	var ng node.NodeGetter = serv
	ds, ok := serv.(*dagService)
	if ok {
		ng = &sesGetter{ds.Blocks.NewSession(ctx)}
	}

185 186
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
187
		return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
188 189 190 191 192 193 194 195 196 197
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
198
	return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
Jeromy's avatar
Jeromy committed
199
}
200

Jeromy's avatar
Jeromy committed
201 202
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
203
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
204
	var out []int
Jeromy's avatar
Jeromy committed
205 206
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
207
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
208 209
		}
	}
Jeromy's avatar
Jeromy committed
210
	return out
Jeromy's avatar
Jeromy committed
211 212
}

213
type NodeOption struct {
Jeromy's avatar
Jeromy committed
214
	Node node.Node
215 216 217
	Err  error
}

Jeromy's avatar
Jeromy committed
218
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
219
	out := make(chan *NodeOption, len(keys))
220
	blocks := ds.Blocks.GetBlocks(ctx, keys)
221 222
	var count int

223 224 225 226 227 228
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
229
					if count != len(keys) {
230
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
231
					}
232 233
					return
				}
Jeromy's avatar
Jeromy committed
234

235 236 237
				nd, err := decodeBlock(b)
				if err != nil {
					out <- &NodeOption{Err: err}
238 239
					return
				}
Jeromy's avatar
Jeromy committed
240

241
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
242 243
				count++

244
			case <-ctx.Done():
245
				out <- &NodeOption{Err: ctx.Err()}
246 247 248 249
				return
			}
		}
	}()
250
	return out
251 252
}

253
// GetDAG will fill out all of the links of the given Node.
254 255
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
256
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
257
	var cids []*cid.Cid
258 259
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
260 261
	}

Jeromy's avatar
Jeromy committed
262
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
263 264
}

Jeromy's avatar
Jeromy committed
265 266
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
267
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
268 269 270 271 272 273

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
274
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
275
	for i := range keys {
276
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
277 278
	}

279
	dedupedKeys := dedupeKeys(keys)
280
	go func() {
281 282 283
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

284
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
285

286
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
287
			select {
288
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
289
				if !ok {
Jeromy's avatar
Jeromy committed
290 291 292
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
293 294
					return
				}
Jeromy's avatar
Jeromy committed
295

296
				if opt.Err != nil {
297 298 299
					for _, p := range promises {
						p.Fail(opt.Err)
					}
300 301 302 303
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
304
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
305
				for _, i := range is {
306
					count++
307
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
308 309 310
				}
			case <-ctx.Done():
				return
311 312 313
			}
		}
	}()
Jeromy's avatar
Jeromy committed
314 315 316
	return promises
}

317
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
318 319 320 321
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
322
	}
Jeromy's avatar
Jeromy committed
323
	return set.Keys()
324 325
}

326
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
327
	return &nodePromise{
Jeromy's avatar
Jeromy committed
328
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
329
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
330
		err:  make(chan error, 1),
331
	}
Jeromy's avatar
Jeromy committed
332 333 334
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
335
	cache node.Node
336
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
337
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
338
	ctx   context.Context
Jeromy's avatar
Jeromy committed
339
	err   chan error
Jeromy's avatar
Jeromy committed
340 341
}

Jeromy's avatar
Jeromy committed
342 343 344 345
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
346
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
347
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
348
	Fail(err error)
Jeromy's avatar
Jeromy committed
349
	Send(node.Node)
Jeromy's avatar
Jeromy committed
350 351 352
}

func (np *nodePromise) Fail(err error) {
353 354 355 356 357 358 359 360 361
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
362
	np.err <- err
Jeromy's avatar
Jeromy committed
363 364
}

Jeromy's avatar
Jeromy committed
365
func (np *nodePromise) Send(nd node.Node) {
366 367
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
368
	if np.cache != nil {
369 370 371 372 373 374 375 376 377 378 379 380
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
381
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
382 383 384 385 386
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
387 388 389
	}

	select {
390 391
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
392 393
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
394 395
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
396 397
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
398
	}
399
}
400 401 402 403

type Batch struct {
	ds *dagService

404 405 406 407
	blocks    []blocks.Block
	size      int
	MaxSize   int
	MaxBlocks int
408 409
}

Jeromy's avatar
Jeromy committed
410
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
411
	t.blocks = append(t.blocks, nd)
412
	t.size += len(nd.RawData())
413
	if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
Jeromy's avatar
Jeromy committed
414
		return nd.Cid(), t.Commit()
415
	}
Jeromy's avatar
Jeromy committed
416
	return nd.Cid(), nil
417 418 419
}

func (t *Batch) Commit() error {
420 421
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
422 423 424
	t.size = 0
	return err
}
425

426 427
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

428 429 430
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
431 432 433
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
434 435
		return err
	}
436
	for _, lnk := range links {
437
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
438
		if visit(c) {
439
			err = EnumerateChildren(ctx, getLinks, c, visit)
440 441 442 443 444 445 446
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
447

448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

469 470 471
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
472

473
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
474
	feed := make(chan *cid.Cid)
475
	out := make(chan []*node.Link)
476 477 478
	done := make(chan struct{})

	var setlk sync.Mutex
479

480 481
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
482

483
	defer cancel()
484

485 486
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
487
			for ic := range feed {
488
				links, err := getLinks(ctx, ic)
489 490 491
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
492
				}
493

494 495 496
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
497

498
				if unseen {
499
					select {
500
					case out <- links:
501
					case <-fetchersCtx.Done():
502 503 504
						return
					}
				}
Jeromy's avatar
Jeromy committed
505
				select {
506
				case done <- struct{}{}:
507
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
508 509
				}
			}
510
		}()
Jeromy's avatar
Jeromy committed
511
	}
512
	defer close(feed)
Jeromy's avatar
Jeromy committed
513

514
	send := feed
515
	var todobuffer []*cid.Cid
516
	var inProgress int
Jeromy's avatar
Jeromy committed
517

518
	next := c
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
535 536
		case links := <-out:
			for _, lnk := range links {
537 538 539 540 541 542
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
543
			}
544 545
		case err := <-errChan:
			return err
546

547
		case <-ctx.Done():
548
			return ctx.Err()
549
		}
Jeromy's avatar
Jeromy committed
550
	}
551

Jeromy's avatar
Jeromy committed
552
}