merkledag.go 11.6 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

9
	bserv "github.com/ipfs/go-ipfs/blockservice"
10
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
11

Steven Allen's avatar
Steven Allen committed
12 13 14
	ipldcbor "gx/ipfs/QmNRz7BDWfdFNVLt7AVvmRefkrURD25EeoipcXqo6yoXU1/go-ipld-cbor"
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
	node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

17 18 19 20 21 22 23 24 25
// TODO: We should move these registrations elsewhere. Really, most of the IPLD
// functionality should go in a `go-ipld` repo but that will take a lot of work
// and design.
func init() {
	node.Register(cid.DagProtobuf, DecodeProtobufBlock)
	node.Register(cid.Raw, DecodeRawBlock)
	node.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
}

26
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
27

Jeromy's avatar
Jeromy committed
28 29
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
ForrestWeston's avatar
ForrestWeston committed
30
	// Add adds the node to the DAGService
Jeromy's avatar
Jeromy committed
31
	Add(node.Node) (*cid.Cid, error)
ForrestWeston's avatar
ForrestWeston committed
32
	// Get gets the node the from the DAGService
Jeromy's avatar
Jeromy committed
33
	Get(context.Context, *cid.Cid) (node.Node, error)
ForrestWeston's avatar
ForrestWeston committed
34
	// Remove removes the node from the DAGService
Jeromy's avatar
Jeromy committed
35
	Remove(node.Node) error
36

Arthur Elliott's avatar
Arthur Elliott committed
37
	// GetMany returns a channel of NodeOption given
Arthur Elliott's avatar
Arthur Elliott committed
38
	// a set of CIDs.
Jeromy's avatar
Jeromy committed
39
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
40

ForrestWeston's avatar
ForrestWeston committed
41
	// Batch is a buffer for batching adds to a dag.
42
	Batch() *Batch
43 44

	LinkService
Jeromy's avatar
Jeromy committed
45 46
}

47
type LinkService interface {
48 49 50 51
	// GetLinks return all links for a node.  The complete node does not
	// necessarily have to exist locally, or at all.  For example, raw
	// leaves cannot possibly have links so there is no need to look
	// at the node.
Jeromy's avatar
Jeromy committed
52
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
53 54

	GetOfflineLinkService() LinkService
55 56
}

57
func NewDAGService(bs bserv.BlockService) *dagService {
58
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
59 60
}

61
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
62 63
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
64 65
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
66
type dagService struct {
67
	Blocks bserv.BlockService
68 69
}

70
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
71
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
72
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
73
		return nil, fmt.Errorf("dagService is nil")
74 75
	}

76
	return n.Blocks.AddBlock(nd)
77 78
}

79
func (n *dagService) Batch() *Batch {
80
	return &Batch{
Steven Allen's avatar
Steven Allen committed
81 82 83
		ds:            n,
		commitResults: make(chan error, ParallelBatchCommits),
		MaxSize:       8 << 20,
84 85 86 87 88 89

		// By default, only batch up to 128 nodes at a time.
		// The current implementation of flatfs opens this many file
		// descriptors at the same time for the optimized batch write.
		MaxBlocks: 128,
	}
90 91
}

92
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
93
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
94
	if n == nil {
95
		return nil, fmt.Errorf("dagService is nil")
96
	}
Jeromy's avatar
Jeromy committed
97

98 99
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
100

Jeromy's avatar
Jeromy committed
101
	b, err := n.Blocks.GetBlock(ctx, c)
102
	if err != nil {
103 104 105
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
106
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
107 108
	}

109
	return node.Decode(b)
110
}
Jeromy's avatar
Jeromy committed
111

112 113
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
Jeromy's avatar
Jeromy committed
114
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
115 116 117
	if c.Type() == cid.Raw {
		return nil, nil
	}
118 119 120 121
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
122
	return node.Links(), nil
123 124
}

125
func (n *dagService) GetOfflineLinkService() LinkService {
126 127
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
128 129 130 131 132 133
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
134
func (n *dagService) Remove(nd node.Node) error {
135
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
136 137
}

138 139 140
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
Jeromy's avatar
Jeromy committed
141
func GetLinksDirect(serv node.NodeGetter) GetLinks {
142 143 144
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
		node, err := serv.Get(ctx, c)
		if err != nil {
Jeromy's avatar
Jeromy committed
145 146 147
			if err == bserv.ErrNotFound {
				err = ErrNotFound
			}
148 149 150 151 152 153
			return nil, err
		}
		return node.Links(), nil
	}
}

154 155 156 157 158 159
type sesGetter struct {
	bs *bserv.Session
}

func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
	blk, err := sg.bs.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
160 161 162 163
	switch err {
	case bserv.ErrNotFound:
		return nil, ErrNotFound
	default:
164
		return nil, err
Jeromy's avatar
Jeromy committed
165 166
	case nil:
		// noop
167 168
	}

169
	return node.Decode(blk)
170 171
}

172
// FetchGraph fetches all nodes that are children of the given node
173
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
174 175 176
	var ng node.NodeGetter = serv
	ds, ok := serv.(*dagService)
	if ok {
177
		ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
178 179
	}

180 181
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
182
		return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
183 184 185 186 187 188 189 190 191 192
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
193
	return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
Jeromy's avatar
Jeromy committed
194
}
195

Jeromy's avatar
Jeromy committed
196 197
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
198
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
199
	var out []int
Jeromy's avatar
Jeromy committed
200 201
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
202
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
203 204
		}
	}
Jeromy's avatar
Jeromy committed
205
	return out
Jeromy's avatar
Jeromy committed
206 207
}

208
type NodeOption struct {
Jeromy's avatar
Jeromy committed
209
	Node node.Node
210 211 212
	Err  error
}

Jeromy's avatar
Jeromy committed
213
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
214
	out := make(chan *NodeOption, len(keys))
215
	blocks := ds.Blocks.GetBlocks(ctx, keys)
216 217
	var count int

218 219 220 221 222 223
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
224
					if count != len(keys) {
225
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
226
					}
227 228
					return
				}
Jeromy's avatar
Jeromy committed
229

230
				nd, err := node.Decode(b)
231 232
				if err != nil {
					out <- &NodeOption{Err: err}
233 234
					return
				}
Jeromy's avatar
Jeromy committed
235

236
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
237 238
				count++

239
			case <-ctx.Done():
240
				out <- &NodeOption{Err: ctx.Err()}
241 242 243 244
				return
			}
		}
	}()
245
	return out
246 247
}

248
// GetDAG will fill out all of the links of the given Node.
249 250
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
251
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
252
	var cids []*cid.Cid
253 254
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
255 256
	}

Jeromy's avatar
Jeromy committed
257
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
258 259
}

Jeromy's avatar
Jeromy committed
260 261
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
262
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
263 264 265 266 267 268

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
269
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
270
	for i := range keys {
271
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
272 273
	}

274
	dedupedKeys := dedupeKeys(keys)
275
	go func() {
276 277 278
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

279
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
280

281
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
282
			select {
283
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
284
				if !ok {
Jeromy's avatar
Jeromy committed
285 286 287
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
288 289
					return
				}
Jeromy's avatar
Jeromy committed
290

291
				if opt.Err != nil {
292 293 294
					for _, p := range promises {
						p.Fail(opt.Err)
					}
295 296 297 298
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
299
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
300
				for _, i := range is {
301
					count++
302
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
303 304 305
				}
			case <-ctx.Done():
				return
306 307 308
			}
		}
	}()
Jeromy's avatar
Jeromy committed
309 310 311
	return promises
}

312
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
313
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
314
	out := make([]*cid.Cid, 0, len(cids))
Jeromy's avatar
Jeromy committed
315 316
	set := cid.NewSet()
	for _, c := range cids {
317 318 319
		if set.Visit(c) {
			out = append(out, c)
		}
320
	}
321
	return out
322 323
}

324
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
325
	return &nodePromise{
Jeromy's avatar
Jeromy committed
326
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
327
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
328
		err:  make(chan error, 1),
329
	}
Jeromy's avatar
Jeromy committed
330 331 332
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
333
	cache node.Node
334
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
335
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
336
	ctx   context.Context
Jeromy's avatar
Jeromy committed
337
	err   chan error
Jeromy's avatar
Jeromy committed
338 339
}

Jeromy's avatar
Jeromy committed
340 341 342 343
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
344
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
345
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
346
	Fail(err error)
Jeromy's avatar
Jeromy committed
347
	Send(node.Node)
Jeromy's avatar
Jeromy committed
348 349 350
}

func (np *nodePromise) Fail(err error) {
351 352 353 354 355 356 357 358 359
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
360
	np.err <- err
Jeromy's avatar
Jeromy committed
361 362
}

Jeromy's avatar
Jeromy committed
363
func (np *nodePromise) Send(nd node.Node) {
364 365
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
366
	if np.cache != nil {
367 368 369 370 371 372 373 374 375 376 377 378
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
379
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
380 381 382 383 384
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
385 386 387
	}

	select {
388 389
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
390 391
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
392 393
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
394 395
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
396
	}
397
}
398

399 400
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

401 402 403
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
404 405 406
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
407 408
		return err
	}
409
	for _, lnk := range links {
410
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
411
		if visit(c) {
412
			err = EnumerateChildren(ctx, getLinks, c, visit)
413 414 415 416 417 418 419
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
420

421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

442 443 444
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
445

446
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
447
	feed := make(chan *cid.Cid)
448
	out := make(chan []*node.Link)
449 450 451
	done := make(chan struct{})

	var setlk sync.Mutex
452

453 454
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
455

456
	defer cancel()
457

458 459
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
460
			for ic := range feed {
461
				links, err := getLinks(ctx, ic)
462 463 464
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
465
				}
466

467 468 469
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
470

471
				if unseen {
472
					select {
473
					case out <- links:
474
					case <-fetchersCtx.Done():
475 476 477
						return
					}
				}
Jeromy's avatar
Jeromy committed
478
				select {
479
				case done <- struct{}{}:
480
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
481 482
				}
			}
483
		}()
Jeromy's avatar
Jeromy committed
484
	}
485
	defer close(feed)
Jeromy's avatar
Jeromy committed
486

487
	send := feed
488
	var todobuffer []*cid.Cid
489
	var inProgress int
Jeromy's avatar
Jeromy committed
490

491
	next := c
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
508 509
		case links := <-out:
			for _, lnk := range links {
510 511 512 513 514 515
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
516
			}
517 518
		case err := <-errChan:
			return err
519

520
		case <-ctx.Done():
521
			return ctx.Err()
522
		}
Jeromy's avatar
Jeromy committed
523
	}
524

Jeromy's avatar
Jeromy committed
525
}