merkledag.go 11.7 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"strings"
8
	"sync"
Jeromy's avatar
Jeromy committed
9

10
	bserv "github.com/ipfs/go-ipfs/blockservice"
11
	offline "github.com/ipfs/go-ipfs/exchange/offline"
12
	blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
Jeromy's avatar
Jeromy committed
13

14 15 16
	node "gx/ipfs/QmPAKbSsgEX5B6fpmxa61jXYnoWzZr5sNafd3qgPiSH8Uv/go-ipld-format"
	cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
	ipldcbor "gx/ipfs/Qmcdid3XrCxcoNQUqZKiiKtM7JXxtyipU3izyRqwjFbVWw/go-ipld-cbor"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18
)

19
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
20

Jeromy's avatar
Jeromy committed
21 22
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
23 24 25
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
26 27 28

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
29
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
30 31

	Batch() *Batch
32 33

	LinkService
Jeromy's avatar
Jeromy committed
34 35
}

36
type LinkService interface {
37 38 39 40
	// GetLinks return all links for a node.  The complete node does not
	// necessarily have to exist locally, or at all.  For example, raw
	// leaves cannot possibly have links so there is no need to look
	// at the node.
Jeromy's avatar
Jeromy committed
41
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
42 43

	GetOfflineLinkService() LinkService
44 45
}

46
func NewDAGService(bs bserv.BlockService) *dagService {
47
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
48 49
}

50
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
51 52
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
53 54
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
55
type dagService struct {
56
	Blocks bserv.BlockService
57 58
}

59
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
60
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
61
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
62
		return nil, fmt.Errorf("dagService is nil")
63 64
	}

65
	return n.Blocks.AddBlock(nd)
66 67
}

68
func (n *dagService) Batch() *Batch {
69 70 71 72 73 74 75 76 77
	return &Batch{
		ds:      n,
		MaxSize: 8 << 20,

		// By default, only batch up to 128 nodes at a time.
		// The current implementation of flatfs opens this many file
		// descriptors at the same time for the optimized batch write.
		MaxBlocks: 128,
	}
78 79
}

80
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
81
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
82
	if n == nil {
83
		return nil, fmt.Errorf("dagService is nil")
84
	}
Jeromy's avatar
Jeromy committed
85

86 87
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
88

Jeromy's avatar
Jeromy committed
89
	b, err := n.Blocks.GetBlock(ctx, c)
90
	if err != nil {
91 92 93
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
94
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
95 96
	}

97 98 99 100 101 102
	return decodeBlock(b)
}

func decodeBlock(b blocks.Block) (node.Node, error) {
	c := b.Cid()

Jeromy's avatar
Jeromy committed
103
	switch c.Type() {
Jeromy's avatar
Jeromy committed
104
	case cid.DagProtobuf:
105
		decnd, err := DecodeProtobuf(b.RawData())
Jeromy's avatar
Jeromy committed
106 107 108 109 110
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
111
		}
112 113

		decnd.cached = b.Cid()
114
		decnd.Prefix = b.Cid().Prefix()
115 116
		return decnd, nil
	case cid.Raw:
117
		return NewRawNodeWPrefix(b.RawData(), b.Cid().Prefix())
Jeromy's avatar
Jeromy committed
118
	case cid.DagCBOR:
119
		return ipldcbor.Decode(b.RawData())
Jeromy's avatar
Jeromy committed
120
	default:
121
		return nil, fmt.Errorf("unrecognized object type: %s", c.Type())
122
	}
123
}
Jeromy's avatar
Jeromy committed
124

125 126
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
Jeromy's avatar
Jeromy committed
127
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
128 129 130
	if c.Type() == cid.Raw {
		return nil, nil
	}
131 132 133 134
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
135
	return node.Links(), nil
136 137
}

138
func (n *dagService) GetOfflineLinkService() LinkService {
139 140
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
141 142 143 144 145 146
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
147
func (n *dagService) Remove(nd node.Node) error {
148
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
149 150
}

151 152 153
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
Jeromy's avatar
Jeromy committed
154
func GetLinksDirect(serv node.NodeGetter) GetLinks {
155 156 157 158 159 160 161 162 163
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
		node, err := serv.Get(ctx, c)
		if err != nil {
			return nil, err
		}
		return node.Links(), nil
	}
}

164
// FetchGraph fetches all nodes that are children of the given node
165 166 167
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
168
		return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit)
169 170 171 172 173 174 175 176 177 178
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
179
	return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, visit)
Jeromy's avatar
Jeromy committed
180
}
181

Jeromy's avatar
Jeromy committed
182 183
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
184
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
185
	var out []int
Jeromy's avatar
Jeromy committed
186 187
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
188
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
189 190
		}
	}
Jeromy's avatar
Jeromy committed
191
	return out
Jeromy's avatar
Jeromy committed
192 193
}

194
type NodeOption struct {
Jeromy's avatar
Jeromy committed
195
	Node node.Node
196 197 198
	Err  error
}

Jeromy's avatar
Jeromy committed
199
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
200
	out := make(chan *NodeOption, len(keys))
201
	blocks := ds.Blocks.GetBlocks(ctx, keys)
202 203
	var count int

204 205 206 207 208 209
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
210
					if count != len(keys) {
211
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
212
					}
213 214
					return
				}
Jeromy's avatar
Jeromy committed
215

216 217 218
				nd, err := decodeBlock(b)
				if err != nil {
					out <- &NodeOption{Err: err}
219 220
					return
				}
Jeromy's avatar
Jeromy committed
221

222
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
223 224
				count++

225
			case <-ctx.Done():
226
				out <- &NodeOption{Err: ctx.Err()}
227 228 229 230
				return
			}
		}
	}()
231
	return out
232 233
}

234
// GetDAG will fill out all of the links of the given Node.
235 236
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
237
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
238
	var cids []*cid.Cid
239 240
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
241 242
	}

Jeromy's avatar
Jeromy committed
243
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
244 245
}

Jeromy's avatar
Jeromy committed
246 247
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
248
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
249 250 251 252 253 254

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
255
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
256
	for i := range keys {
257
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
258 259
	}

260
	dedupedKeys := dedupeKeys(keys)
261
	go func() {
262 263 264
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

265
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
266

267
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
268
			select {
269
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
270
				if !ok {
Jeromy's avatar
Jeromy committed
271 272 273
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
274 275
					return
				}
Jeromy's avatar
Jeromy committed
276

277
				if opt.Err != nil {
278 279 280
					for _, p := range promises {
						p.Fail(opt.Err)
					}
281 282 283 284
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
285
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
286
				for _, i := range is {
287
					count++
288
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
289 290 291
				}
			case <-ctx.Done():
				return
292 293 294
			}
		}
	}()
Jeromy's avatar
Jeromy committed
295 296 297
	return promises
}

298
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
299 300 301 302
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
303
	}
Jeromy's avatar
Jeromy committed
304
	return set.Keys()
305 306
}

307
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
308
	return &nodePromise{
Jeromy's avatar
Jeromy committed
309
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
310
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
311
		err:  make(chan error, 1),
312
	}
Jeromy's avatar
Jeromy committed
313 314 315
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
316
	cache node.Node
317
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
318
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
319
	ctx   context.Context
Jeromy's avatar
Jeromy committed
320
	err   chan error
Jeromy's avatar
Jeromy committed
321 322
}

Jeromy's avatar
Jeromy committed
323 324 325 326
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
327
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
328
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
329
	Fail(err error)
Jeromy's avatar
Jeromy committed
330
	Send(node.Node)
Jeromy's avatar
Jeromy committed
331 332 333
}

func (np *nodePromise) Fail(err error) {
334 335 336 337 338 339 340 341 342
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
343
	np.err <- err
Jeromy's avatar
Jeromy committed
344 345
}

Jeromy's avatar
Jeromy committed
346
func (np *nodePromise) Send(nd node.Node) {
347 348
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
349
	if np.cache != nil {
350 351 352 353 354 355 356 357 358 359 360 361
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
362
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
363 364 365 366 367
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
368 369 370
	}

	select {
371 372
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
373 374
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
375 376
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
377 378
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
379
	}
380
}
381 382 383 384

type Batch struct {
	ds *dagService

385 386 387 388
	blocks    []blocks.Block
	size      int
	MaxSize   int
	MaxBlocks int
389 390
}

Jeromy's avatar
Jeromy committed
391
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
392
	t.blocks = append(t.blocks, nd)
393
	t.size += len(nd.RawData())
394
	if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
Jeromy's avatar
Jeromy committed
395
		return nd.Cid(), t.Commit()
396
	}
Jeromy's avatar
Jeromy committed
397
	return nd.Cid(), nil
398 399 400
}

func (t *Batch) Commit() error {
401 402
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
403 404 405
	t.size = 0
	return err
}
406

407 408
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

409 410 411
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
412 413 414
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
415 416
		return err
	}
417
	for _, lnk := range links {
418
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
419
		if visit(c) {
420
			err = EnumerateChildren(ctx, getLinks, c, visit)
421 422 423 424 425 426 427
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
428

429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

450 451 452
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
453

454
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
455
	feed := make(chan *cid.Cid)
456
	out := make(chan []*node.Link)
457 458 459
	done := make(chan struct{})

	var setlk sync.Mutex
460

461 462
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
463

464
	defer cancel()
465

466 467
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
468
			for ic := range feed {
469
				links, err := getLinks(ctx, ic)
470 471 472
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
473
				}
474

475 476 477
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
478

479
				if unseen {
480
					select {
481
					case out <- links:
482
					case <-fetchersCtx.Done():
483 484 485
						return
					}
				}
Jeromy's avatar
Jeromy committed
486
				select {
487
				case done <- struct{}{}:
488
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
489 490
				}
			}
491
		}()
Jeromy's avatar
Jeromy committed
492
	}
493
	defer close(feed)
Jeromy's avatar
Jeromy committed
494

495
	send := feed
496
	var todobuffer []*cid.Cid
497
	var inProgress int
Jeromy's avatar
Jeromy committed
498

499
	next := c
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
516 517
		case links := <-out:
			for _, lnk := range links {
518 519 520 521 522 523
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
524
			}
525 526
		case err := <-errChan:
			return err
527

528
		case <-ctx.Done():
529
			return ctx.Err()
530
		}
Jeromy's avatar
Jeromy committed
531
	}
532

Jeromy's avatar
Jeromy committed
533
}