merkledag.go 11.5 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"strings"
8
	"sync"
Jeromy's avatar
Jeromy committed
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
12
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
13

Jeromy's avatar
Jeromy committed
14
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
15 16
	cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
	node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node"
17
	ipldcbor "gx/ipfs/QmdaC21UyoyN3t9QdapHZfsaUo3mqVf5p4CEuFaYVFqwap/go-ipld-cbor"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("merkledag")
21
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
22

Jeromy's avatar
Jeromy committed
23 24
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
25 26 27
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
28 29 30

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
31
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
32 33

	Batch() *Batch
34 35

	LinkService
Jeromy's avatar
Jeromy committed
36 37
}

38
type LinkService interface {
39 40 41 42
	// GetLinks return all links for a node.  The complete node does not
	// necessarily have to exist locally, or at all.  For example, raw
	// leaves cannot possibly have links so there is no need to look
	// at the node.
Jeromy's avatar
Jeromy committed
43
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
44 45

	GetOfflineLinkService() LinkService
46 47
}

48
func NewDAGService(bs bserv.BlockService) *dagService {
49
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
50 51
}

52
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
53 54
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
55 56
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
57
type dagService struct {
58
	Blocks bserv.BlockService
59 60
}

61
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
62
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
63
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
64
		return nil, fmt.Errorf("dagService is nil")
65 66
	}

67
	return n.Blocks.AddBlock(nd)
68 69
}

70 71 72 73
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

74
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
75
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
76
	if n == nil {
77
		return nil, fmt.Errorf("dagService is nil")
78
	}
Jeromy's avatar
Jeromy committed
79

80 81
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
82

Jeromy's avatar
Jeromy committed
83
	b, err := n.Blocks.GetBlock(ctx, c)
84
	if err != nil {
85 86 87
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
88
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
89 90
	}

91 92 93 94 95 96
	return decodeBlock(b)
}

func decodeBlock(b blocks.Block) (node.Node, error) {
	c := b.Cid()

Jeromy's avatar
Jeromy committed
97
	switch c.Type() {
Jeromy's avatar
Jeromy committed
98
	case cid.DagProtobuf:
99
		decnd, err := DecodeProtobuf(b.RawData())
Jeromy's avatar
Jeromy committed
100 101 102 103 104
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
105
		}
106 107

		decnd.cached = b.Cid()
108
		decnd.Prefix = b.Cid().Prefix()
109 110 111
		return decnd, nil
	case cid.Raw:
		return NewRawNode(b.RawData()), nil
Jeromy's avatar
Jeromy committed
112
	case cid.DagCBOR:
113
		return ipldcbor.Decode(b.RawData())
Jeromy's avatar
Jeromy committed
114
	default:
115
		return nil, fmt.Errorf("unrecognized object type: %s", c.Type())
116
	}
117
}
Jeromy's avatar
Jeromy committed
118

119 120
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
Jeromy's avatar
Jeromy committed
121
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
122 123 124
	if c.Type() == cid.Raw {
		return nil, nil
	}
125 126 127 128
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
129
	return node.Links(), nil
130 131
}

132
func (n *dagService) GetOfflineLinkService() LinkService {
133 134
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
135 136 137 138 139 140
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
141
func (n *dagService) Remove(nd node.Node) error {
142
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
143 144
}

145 146 147
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
148 149 150 151 152 153 154 155 156 157
func GetLinksDirect(serv DAGService) GetLinks {
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
		node, err := serv.Get(ctx, c)
		if err != nil {
			return nil, err
		}
		return node.Links(), nil
	}
}

158
// FetchGraph fetches all nodes that are children of the given node
159 160 161
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
162
		return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit)
163 164 165 166 167 168 169 170 171 172
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
173
	return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, visit)
Jeromy's avatar
Jeromy committed
174
}
175

Jeromy's avatar
Jeromy committed
176 177
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
178
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
179
	var out []int
Jeromy's avatar
Jeromy committed
180 181
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
182
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
183 184
		}
	}
Jeromy's avatar
Jeromy committed
185
	return out
Jeromy's avatar
Jeromy committed
186 187
}

188
type NodeOption struct {
Jeromy's avatar
Jeromy committed
189
	Node node.Node
190 191 192
	Err  error
}

Jeromy's avatar
Jeromy committed
193
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
194
	out := make(chan *NodeOption, len(keys))
195
	blocks := ds.Blocks.GetBlocks(ctx, keys)
196 197
	var count int

198 199 200 201 202 203
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
204
					if count != len(keys) {
205
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
206
					}
207 208
					return
				}
Jeromy's avatar
Jeromy committed
209

210 211 212
				nd, err := decodeBlock(b)
				if err != nil {
					out <- &NodeOption{Err: err}
213 214
					return
				}
Jeromy's avatar
Jeromy committed
215

216
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
217 218
				count++

219
			case <-ctx.Done():
220
				out <- &NodeOption{Err: ctx.Err()}
221 222 223 224
				return
			}
		}
	}()
225
	return out
226 227
}

228
// GetDAG will fill out all of the links of the given Node.
229 230
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
231
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
232
	var cids []*cid.Cid
233 234
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
235 236
	}

Jeromy's avatar
Jeromy committed
237
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
238 239
}

Jeromy's avatar
Jeromy committed
240 241
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
242
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
243 244 245 246 247 248

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
249
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
250
	for i := range keys {
251
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
252 253
	}

254
	dedupedKeys := dedupeKeys(keys)
255
	go func() {
256 257 258
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

259
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
260

261
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
262
			select {
263
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
264
				if !ok {
Jeromy's avatar
Jeromy committed
265 266 267
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
268 269
					return
				}
Jeromy's avatar
Jeromy committed
270

271
				if opt.Err != nil {
272 273 274
					for _, p := range promises {
						p.Fail(opt.Err)
					}
275 276 277 278
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
279
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
280
				for _, i := range is {
281
					count++
282
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
283 284 285
				}
			case <-ctx.Done():
				return
286 287 288
			}
		}
	}()
Jeromy's avatar
Jeromy committed
289 290 291
	return promises
}

292
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
293 294 295 296
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
297
	}
Jeromy's avatar
Jeromy committed
298
	return set.Keys()
299 300
}

301
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
302
	return &nodePromise{
Jeromy's avatar
Jeromy committed
303
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
304
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
305
		err:  make(chan error, 1),
306
	}
Jeromy's avatar
Jeromy committed
307 308 309
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
310
	cache node.Node
311
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
312
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
313
	ctx   context.Context
Jeromy's avatar
Jeromy committed
314
	err   chan error
Jeromy's avatar
Jeromy committed
315 316
}

Jeromy's avatar
Jeromy committed
317 318 319 320
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
321
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
322
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
323
	Fail(err error)
Jeromy's avatar
Jeromy committed
324
	Send(node.Node)
Jeromy's avatar
Jeromy committed
325 326 327
}

func (np *nodePromise) Fail(err error) {
328 329 330 331 332 333 334 335 336
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
337
	np.err <- err
Jeromy's avatar
Jeromy committed
338 339
}

Jeromy's avatar
Jeromy committed
340
func (np *nodePromise) Send(nd node.Node) {
341 342
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
343
	if np.cache != nil {
344 345 346 347 348 349 350 351 352 353 354 355
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
356
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
357 358 359 360 361
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
362 363 364
	}

	select {
365 366
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
367 368
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
369 370
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
371 372
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
373
	}
374
}
375 376 377 378

type Batch struct {
	ds *dagService

379
	blocks  []blocks.Block
380 381 382 383
	size    int
	MaxSize int
}

Jeromy's avatar
Jeromy committed
384
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
385
	t.blocks = append(t.blocks, nd)
386
	t.size += len(nd.RawData())
387
	if t.size > t.MaxSize {
Jeromy's avatar
Jeromy committed
388
		return nd.Cid(), t.Commit()
389
	}
Jeromy's avatar
Jeromy committed
390
	return nd.Cid(), nil
391 392 393
}

func (t *Batch) Commit() error {
394 395
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
396 397 398
	t.size = 0
	return err
}
399

400 401
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

402 403 404
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
405 406 407
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
408 409
		return err
	}
410
	for _, lnk := range links {
411
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
412
		if visit(c) {
413
			err = EnumerateChildren(ctx, getLinks, c, visit)
414 415 416 417 418 419 420
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
421

422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

443 444 445
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
446

447
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
448
	feed := make(chan *cid.Cid)
449
	out := make(chan []*node.Link)
450 451 452
	done := make(chan struct{})

	var setlk sync.Mutex
453

454 455
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
456

457
	defer cancel()
458

459 460
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
461
			for ic := range feed {
462
				links, err := getLinks(ctx, ic)
463 464 465
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
466
				}
467

468 469 470
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
471

472
				if unseen {
473
					select {
474
					case out <- links:
475
					case <-fetchersCtx.Done():
476 477 478
						return
					}
				}
Jeromy's avatar
Jeromy committed
479
				select {
480
				case done <- struct{}{}:
481
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
482 483
				}
			}
484
		}()
Jeromy's avatar
Jeromy committed
485
	}
486
	defer close(feed)
Jeromy's avatar
Jeromy committed
487

488
	send := feed
489
	var todobuffer []*cid.Cid
490
	var inProgress int
Jeromy's avatar
Jeromy committed
491

492
	next := c
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
509 510
		case links := <-out:
			for _, lnk := range links {
511 512 513 514 515 516
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
517
			}
518 519
		case err := <-errChan:
			return err
520

521
		case <-ctx.Done():
522
			return ctx.Err()
523
		}
Jeromy's avatar
Jeromy committed
524
	}
525

Jeromy's avatar
Jeromy committed
526
}