merkledag.go 11.2 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"strings"
8
	"sync"
Jeromy's avatar
Jeromy committed
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
12
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
13

Jeromy's avatar
Jeromy committed
14
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
15 16
	cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
	node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node"
17
	ipldcbor "gx/ipfs/QmdaC21UyoyN3t9QdapHZfsaUo3mqVf5p4CEuFaYVFqwap/go-ipld-cbor"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("merkledag")
21
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
22

Jeromy's avatar
Jeromy committed
23 24
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
25 26 27
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
28 29 30

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
31
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
32 33

	Batch() *Batch
34 35

	LinkService
Jeromy's avatar
Jeromy committed
36 37
}

38
type LinkService interface {
39 40
	// Return all links for a node, may be more effect than
	// calling Get in DAGService
Jeromy's avatar
Jeromy committed
41
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
42 43

	GetOfflineLinkService() LinkService
44 45
}

46
func NewDAGService(bs bserv.BlockService) *dagService {
47
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
48 49
}

50
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
51 52
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
53 54
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
55
type dagService struct {
56
	Blocks bserv.BlockService
57 58
}

59
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
60
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
61
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
62
		return nil, fmt.Errorf("dagService is nil")
63 64
	}

65
	return n.Blocks.AddBlock(nd)
66 67
}

68 69 70 71
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

72
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
73
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
74
	if n == nil {
75
		return nil, fmt.Errorf("dagService is nil")
76
	}
Jeromy's avatar
Jeromy committed
77

78 79
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
80

Jeromy's avatar
Jeromy committed
81
	b, err := n.Blocks.GetBlock(ctx, c)
82
	if err != nil {
83 84 85
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
86
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
87 88
	}

89 90 91 92 93 94
	return decodeBlock(b)
}

func decodeBlock(b blocks.Block) (node.Node, error) {
	c := b.Cid()

Jeromy's avatar
Jeromy committed
95
	switch c.Type() {
Jeromy's avatar
Jeromy committed
96
	case cid.DagProtobuf:
97
		decnd, err := DecodeProtobuf(b.RawData())
Jeromy's avatar
Jeromy committed
98 99 100 101 102
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
103
		}
104 105

		decnd.cached = b.Cid()
106
		decnd.Prefix = b.Cid().Prefix()
107 108 109
		return decnd, nil
	case cid.Raw:
		return NewRawNode(b.RawData()), nil
Jeromy's avatar
Jeromy committed
110
	case cid.DagCBOR:
111
		return ipldcbor.Decode(b.RawData())
Jeromy's avatar
Jeromy committed
112
	default:
113
		return nil, fmt.Errorf("unrecognized object type: %s", c.Type())
114
	}
115
}
Jeromy's avatar
Jeromy committed
116

Jeromy's avatar
Jeromy committed
117
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
118 119 120
	if c.Type() == cid.Raw {
		return nil, nil
	}
121 122 123 124
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
125
	return node.Links(), nil
126 127
}

128
func (n *dagService) GetOfflineLinkService() LinkService {
129 130
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
131 132 133 134 135 136
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
137
func (n *dagService) Remove(nd node.Node) error {
138
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
139 140
}

141 142 143 144 145 146 147 148 149 150 151 152
// get the links for a node, from the node, bypassing the
// LinkService
func GetLinksDirect(serv DAGService) GetLinks {
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
		node, err := serv.Get(ctx, c)
		if err != nil {
			return nil, err
		}
		return node.Links(), nil
	}
}

153
// FetchGraph fetches all nodes that are children of the given node
154 155 156
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
157
		return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit)
158 159 160 161 162 163 164 165 166 167
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
168
	return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, visit)
Jeromy's avatar
Jeromy committed
169
}
170

Jeromy's avatar
Jeromy committed
171 172
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
173
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
174
	var out []int
Jeromy's avatar
Jeromy committed
175 176
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
177
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
178 179
		}
	}
Jeromy's avatar
Jeromy committed
180
	return out
Jeromy's avatar
Jeromy committed
181 182
}

183
type NodeOption struct {
Jeromy's avatar
Jeromy committed
184
	Node node.Node
185 186 187
	Err  error
}

Jeromy's avatar
Jeromy committed
188
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
189
	out := make(chan *NodeOption, len(keys))
190
	blocks := ds.Blocks.GetBlocks(ctx, keys)
191 192
	var count int

193 194 195 196 197 198
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
199
					if count != len(keys) {
200
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
201
					}
202 203
					return
				}
Jeromy's avatar
Jeromy committed
204

205 206 207
				nd, err := decodeBlock(b)
				if err != nil {
					out <- &NodeOption{Err: err}
208 209
					return
				}
Jeromy's avatar
Jeromy committed
210

211
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
212 213
				count++

214
			case <-ctx.Done():
215
				out <- &NodeOption{Err: ctx.Err()}
216 217 218 219
				return
			}
		}
	}()
220
	return out
221 222
}

223
// GetDAG will fill out all of the links of the given Node.
224 225
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
226
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
227
	var cids []*cid.Cid
228 229
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
230 231
	}

Jeromy's avatar
Jeromy committed
232
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
233 234
}

Jeromy's avatar
Jeromy committed
235 236
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
237
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
238 239 240 241 242 243

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
244
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
245
	for i := range keys {
246
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
247 248
	}

249
	dedupedKeys := dedupeKeys(keys)
250
	go func() {
251 252 253
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

254
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
255

256
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
257
			select {
258
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
259
				if !ok {
Jeromy's avatar
Jeromy committed
260 261 262
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
263 264
					return
				}
Jeromy's avatar
Jeromy committed
265

266
				if opt.Err != nil {
267 268 269
					for _, p := range promises {
						p.Fail(opt.Err)
					}
270 271 272 273
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
274
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
275
				for _, i := range is {
276
					count++
277
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
278 279 280
				}
			case <-ctx.Done():
				return
281 282 283
			}
		}
	}()
Jeromy's avatar
Jeromy committed
284 285 286
	return promises
}

287
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
288 289 290 291
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
292
	}
Jeromy's avatar
Jeromy committed
293
	return set.Keys()
294 295
}

296
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
297
	return &nodePromise{
Jeromy's avatar
Jeromy committed
298
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
299
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
300
		err:  make(chan error, 1),
301
	}
Jeromy's avatar
Jeromy committed
302 303 304
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
305
	cache node.Node
306
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
307
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
308
	ctx   context.Context
Jeromy's avatar
Jeromy committed
309
	err   chan error
Jeromy's avatar
Jeromy committed
310 311
}

Jeromy's avatar
Jeromy committed
312 313 314 315
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
316
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
317
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
318
	Fail(err error)
Jeromy's avatar
Jeromy committed
319
	Send(node.Node)
Jeromy's avatar
Jeromy committed
320 321 322
}

func (np *nodePromise) Fail(err error) {
323 324 325 326 327 328 329 330 331
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
332
	np.err <- err
Jeromy's avatar
Jeromy committed
333 334
}

Jeromy's avatar
Jeromy committed
335
func (np *nodePromise) Send(nd node.Node) {
336 337
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
338
	if np.cache != nil {
339 340 341 342 343 344 345 346 347 348 349 350
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
351
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
352 353 354 355 356
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
357 358 359
	}

	select {
360 361
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
362 363
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
364 365
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
366 367
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
368
	}
369
}
370 371 372 373

type Batch struct {
	ds *dagService

374
	blocks  []blocks.Block
375 376 377 378
	size    int
	MaxSize int
}

Jeromy's avatar
Jeromy committed
379
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
380
	t.blocks = append(t.blocks, nd)
381
	t.size += len(nd.RawData())
382
	if t.size > t.MaxSize {
Jeromy's avatar
Jeromy committed
383
		return nd.Cid(), t.Commit()
384
	}
Jeromy's avatar
Jeromy committed
385
	return nd.Cid(), nil
386 387 388
}

func (t *Batch) Commit() error {
389 390
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
391 392 393
	t.size = 0
	return err
}
394

395 396
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

397 398 399
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
400 401 402
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
403 404
		return err
	}
405
	for _, lnk := range links {
406
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
407
		if visit(c) {
408
			err = EnumerateChildren(ctx, getLinks, c, visit)
409 410 411 412 413 414 415
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
416

417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

438 439 440
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
441

442
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
443
	feed := make(chan *cid.Cid)
444
	out := make(chan []*node.Link)
445 446 447
	done := make(chan struct{})

	var setlk sync.Mutex
448

449 450
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
451

452
	defer cancel()
453

454 455
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
456
			for ic := range feed {
457
				links, err := getLinks(ctx, ic)
458 459 460
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
461
				}
462

463 464 465
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
466

467
				if unseen {
468
					select {
469
					case out <- links:
470
					case <-fetchersCtx.Done():
471 472 473
						return
					}
				}
Jeromy's avatar
Jeromy committed
474
				select {
475
				case done <- struct{}{}:
476
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
477 478
				}
			}
479
		}()
Jeromy's avatar
Jeromy committed
480
	}
481
	defer close(feed)
Jeromy's avatar
Jeromy committed
482

483
	send := feed
484
	var todobuffer []*cid.Cid
485
	var inProgress int
Jeromy's avatar
Jeromy committed
486

487
	next := c
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
504 505
		case links := <-out:
			for _, lnk := range links {
506 507 508 509 510 511
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
512
			}
513 514
		case err := <-errChan:
			return err
515

516
		case <-ctx.Done():
517
			return ctx.Err()
518
		}
Jeromy's avatar
Jeromy committed
519
	}
520

Jeromy's avatar
Jeromy committed
521
}