merkledag.go 10.8 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"strings"
8
	"sync"
Jeromy's avatar
Jeromy committed
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
12
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
13

Jeromy's avatar
Jeromy committed
14
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
15 16
	cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
	node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node"
17
	ipldcbor "gx/ipfs/QmdaC21UyoyN3t9QdapHZfsaUo3mqVf5p4CEuFaYVFqwap/go-ipld-cbor"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("merkledag")
21
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
22

Jeromy's avatar
Jeromy committed
23 24
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
25 26 27
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
28 29 30

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
31
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
32 33

	Batch() *Batch
34 35

	LinkService
Jeromy's avatar
Jeromy committed
36 37
}

38
type LinkService interface {
39 40
	// Return all links for a node, may be more effect than
	// calling Get in DAGService
Jeromy's avatar
Jeromy committed
41
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
42 43

	GetOfflineLinkService() LinkService
44 45
}

46
func NewDAGService(bs bserv.BlockService) *dagService {
47
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
48 49
}

50
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
51 52
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
53 54
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
55
type dagService struct {
56
	Blocks bserv.BlockService
57 58
}

59
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
60
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
61
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
62
		return nil, fmt.Errorf("dagService is nil")
63 64
	}

65
	return n.Blocks.AddBlock(nd)
66 67
}

68 69 70 71
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

72
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
73
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
74
	if n == nil {
75
		return nil, fmt.Errorf("dagService is nil")
76
	}
Jeromy's avatar
Jeromy committed
77

78 79
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
80

Jeromy's avatar
Jeromy committed
81
	b, err := n.Blocks.GetBlock(ctx, c)
82
	if err != nil {
83 84 85
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
86
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
87 88
	}

89 90 91 92 93 94
	return decodeBlock(b)
}

func decodeBlock(b blocks.Block) (node.Node, error) {
	c := b.Cid()

Jeromy's avatar
Jeromy committed
95
	switch c.Type() {
Jeromy's avatar
Jeromy committed
96
	case cid.DagProtobuf:
97
		decnd, err := DecodeProtobuf(b.RawData())
Jeromy's avatar
Jeromy committed
98 99 100 101 102
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
103
		}
104 105

		decnd.cached = b.Cid()
106
		decnd.Prefix = b.Cid().Prefix()
107 108 109
		return decnd, nil
	case cid.Raw:
		return NewRawNode(b.RawData()), nil
Jeromy's avatar
Jeromy committed
110
	case cid.DagCBOR:
111
		return ipldcbor.Decode(b.RawData())
Jeromy's avatar
Jeromy committed
112
	default:
113
		return nil, fmt.Errorf("unrecognized object type: %s", c.Type())
114
	}
115
}
Jeromy's avatar
Jeromy committed
116

Jeromy's avatar
Jeromy committed
117
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
118 119 120
	if c.Type() == cid.Raw {
		return nil, nil
	}
121 122 123 124
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
125
	return node.Links(), nil
126 127
}

128
func (n *dagService) GetOfflineLinkService() LinkService {
129 130
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
131 132 133 134 135 136
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
137
func (n *dagService) Remove(nd node.Node) error {
138
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
139 140
}

141
// FetchGraph fetches all nodes that are children of the given node
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
		return EnumerateChildrenAsync(ctx, serv, root, cid.NewSet().Visit)
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
	return EnumerateChildrenAsync(ctx, serv, root, visit)
Jeromy's avatar
Jeromy committed
157
}
158

Jeromy's avatar
Jeromy committed
159 160
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
161
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
162
	var out []int
Jeromy's avatar
Jeromy committed
163 164
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
165
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
166 167
		}
	}
Jeromy's avatar
Jeromy committed
168
	return out
Jeromy's avatar
Jeromy committed
169 170
}

171
type NodeOption struct {
Jeromy's avatar
Jeromy committed
172
	Node node.Node
173 174 175
	Err  error
}

Jeromy's avatar
Jeromy committed
176
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
177
	out := make(chan *NodeOption, len(keys))
178
	blocks := ds.Blocks.GetBlocks(ctx, keys)
179 180
	var count int

181 182 183 184 185 186
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
187
					if count != len(keys) {
188
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
189
					}
190 191
					return
				}
Jeromy's avatar
Jeromy committed
192

193 194 195
				nd, err := decodeBlock(b)
				if err != nil {
					out <- &NodeOption{Err: err}
196 197
					return
				}
Jeromy's avatar
Jeromy committed
198

199
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
200 201
				count++

202
			case <-ctx.Done():
203
				out <- &NodeOption{Err: ctx.Err()}
204 205 206 207
				return
			}
		}
	}()
208
	return out
209 210
}

211
// GetDAG will fill out all of the links of the given Node.
212 213
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
214
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
215
	var cids []*cid.Cid
216 217
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
218 219
	}

Jeromy's avatar
Jeromy committed
220
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
221 222
}

Jeromy's avatar
Jeromy committed
223 224
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
225
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
226 227 228 229 230 231

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
232
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
233
	for i := range keys {
234
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
235 236
	}

237
	dedupedKeys := dedupeKeys(keys)
238
	go func() {
239 240 241
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

242
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
243

244
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
245
			select {
246
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
247
				if !ok {
Jeromy's avatar
Jeromy committed
248 249 250
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
251 252
					return
				}
Jeromy's avatar
Jeromy committed
253

254
				if opt.Err != nil {
255 256 257
					for _, p := range promises {
						p.Fail(opt.Err)
					}
258 259 260 261
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
262
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
263
				for _, i := range is {
264
					count++
265
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
266 267 268
				}
			case <-ctx.Done():
				return
269 270 271
			}
		}
	}()
Jeromy's avatar
Jeromy committed
272 273 274
	return promises
}

275
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
276 277 278 279
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
280
	}
Jeromy's avatar
Jeromy committed
281
	return set.Keys()
282 283
}

284
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
285
	return &nodePromise{
Jeromy's avatar
Jeromy committed
286
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
287
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
288
		err:  make(chan error, 1),
289
	}
Jeromy's avatar
Jeromy committed
290 291 292
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
293
	cache node.Node
294
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
295
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
296
	ctx   context.Context
Jeromy's avatar
Jeromy committed
297
	err   chan error
Jeromy's avatar
Jeromy committed
298 299
}

Jeromy's avatar
Jeromy committed
300 301 302 303
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
304
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
305
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
306
	Fail(err error)
Jeromy's avatar
Jeromy committed
307
	Send(node.Node)
Jeromy's avatar
Jeromy committed
308 309 310
}

func (np *nodePromise) Fail(err error) {
311 312 313 314 315 316 317 318 319
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
320
	np.err <- err
Jeromy's avatar
Jeromy committed
321 322
}

Jeromy's avatar
Jeromy committed
323
func (np *nodePromise) Send(nd node.Node) {
324 325
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
326
	if np.cache != nil {
327 328 329 330 331 332 333 334 335 336 337 338
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
339
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
340 341 342 343 344
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
345 346 347
	}

	select {
348 349
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
350 351
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
352 353
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
354 355
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
356
	}
357
}
358 359 360 361

type Batch struct {
	ds *dagService

362
	blocks  []blocks.Block
363 364 365 366
	size    int
	MaxSize int
}

Jeromy's avatar
Jeromy committed
367
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
368
	t.blocks = append(t.blocks, nd)
369
	t.size += len(nd.RawData())
370
	if t.size > t.MaxSize {
Jeromy's avatar
Jeromy committed
371
		return nd.Cid(), t.Commit()
372
	}
Jeromy's avatar
Jeromy committed
373
	return nd.Cid(), nil
374 375 376
}

func (t *Batch) Commit() error {
377 378
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
379 380 381
	t.size = 0
	return err
}
382 383 384 385

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
386 387 388 389
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
390 391
		return err
	}
392
	for _, lnk := range links {
393
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
394
		if visit(c) {
395
			err = EnumerateChildren(ctx, getLinks, c, visit)
396 397 398 399 400 401 402
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
403

404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

425 426 427
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
428

429
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
430 431
	feed := make(chan *cid.Cid)
	out := make(chan node.Node)
432 433 434
	done := make(chan struct{})

	var setlk sync.Mutex
435

436 437
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
438

439
	defer cancel()
440

441 442
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
443 444 445 446 447
			for ic := range feed {
				n, err := ds.Get(ctx, ic)
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
448
				}
449

450 451 452
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
453

454
				if unseen {
455
					select {
456 457
					case out <- n:
					case <-fetchersCtx.Done():
458 459 460
						return
					}
				}
Jeromy's avatar
Jeromy committed
461
				select {
462
				case done <- struct{}{}:
463
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
464 465
				}
			}
466
		}()
Jeromy's avatar
Jeromy committed
467
	}
468
	defer close(feed)
Jeromy's avatar
Jeromy committed
469

470
	send := feed
471
	var todobuffer []*cid.Cid
472
	var inProgress int
Jeromy's avatar
Jeromy committed
473

474
	next := c
475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
491 492 493 494 495 496 497 498
		case nd := <-out:
			for _, lnk := range nd.Links() {
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
499
			}
500 501
		case err := <-errChan:
			return err
502

503
		case <-ctx.Done():
504
			return ctx.Err()
505
		}
Jeromy's avatar
Jeromy committed
506
	}
507

Jeromy's avatar
Jeromy committed
508
}