merkledag.go 9.94 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"strings"
8
	"sync"
Jeromy's avatar
Jeromy committed
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
12
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
13

Jeromy's avatar
Jeromy committed
14
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
15 16
	node "gx/ipfs/QmU7bFWQ793qmvNy7outdCaMfSDNk8uqhx4VNrxYj5fj5g/go-ipld-node"
	cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18
)

Jeromy's avatar
Jeromy committed
19
var log = logging.Logger("merkledag")
20
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
21

Jeromy's avatar
Jeromy committed
22 23
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
24 25 26
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
27 28 29

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
30
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
31 32

	Batch() *Batch
33 34

	LinkService
Jeromy's avatar
Jeromy committed
35 36
}

37
type LinkService interface {
38 39
	// Return all links for a node, may be more effect than
	// calling Get in DAGService
Jeromy's avatar
Jeromy committed
40
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
41 42

	GetOfflineLinkService() LinkService
43 44
}

45
func NewDAGService(bs bserv.BlockService) *dagService {
46
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
47 48
}

49
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
50 51
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
52 53
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
54
type dagService struct {
55
	Blocks bserv.BlockService
56 57
}

58
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
59
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
60
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
61
		return nil, fmt.Errorf("dagService is nil")
62 63
	}

64
	return n.Blocks.AddBlock(nd)
65 66
}

67 68 69 70
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

71
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
72
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
73
	if n == nil {
74
		return nil, fmt.Errorf("dagService is nil")
75
	}
Jeromy's avatar
Jeromy committed
76

77 78
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
79

Jeromy's avatar
Jeromy committed
80
	b, err := n.Blocks.GetBlock(ctx, c)
81
	if err != nil {
82 83 84
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
85
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
86 87
	}

88 89 90 91 92 93
	return decodeBlock(b)
}

func decodeBlock(b blocks.Block) (node.Node, error) {
	c := b.Cid()

Jeromy's avatar
Jeromy committed
94 95
	switch c.Type() {
	case cid.Protobuf:
96
		decnd, err := DecodeProtobuf(b.RawData())
Jeromy's avatar
Jeromy committed
97 98 99 100 101
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
102
		}
103 104 105 106 107

		decnd.cached = b.Cid()
		return decnd, nil
	case cid.Raw:
		return NewRawNode(b.RawData()), nil
Jeromy's avatar
Jeromy committed
108
	default:
109
		return nil, fmt.Errorf("unrecognized object type: %s", c.Type())
110
	}
111
}
Jeromy's avatar
Jeromy committed
112

Jeromy's avatar
Jeromy committed
113
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
114 115 116 117
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
118
	return node.Links(), nil
119 120
}

121
func (n *dagService) GetOfflineLinkService() LinkService {
122 123
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
124 125 126 127 128 129
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
130
func (n *dagService) Remove(nd node.Node) error {
131
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
132 133
}

134
// FetchGraph fetches all nodes that are children of the given node
135 136
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
	return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
Jeromy's avatar
Jeromy committed
137
}
138

Jeromy's avatar
Jeromy committed
139 140
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
141
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
142
	var out []int
Jeromy's avatar
Jeromy committed
143 144
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
145
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
146 147
		}
	}
Jeromy's avatar
Jeromy committed
148
	return out
Jeromy's avatar
Jeromy committed
149 150
}

151
type NodeOption struct {
Jeromy's avatar
Jeromy committed
152
	Node node.Node
153 154 155
	Err  error
}

Jeromy's avatar
Jeromy committed
156
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
157
	out := make(chan *NodeOption, len(keys))
158
	blocks := ds.Blocks.GetBlocks(ctx, keys)
159 160
	var count int

161 162 163 164 165 166
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
167
					if count != len(keys) {
168
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
169
					}
170 171
					return
				}
Jeromy's avatar
Jeromy committed
172

173 174 175
				nd, err := decodeBlock(b)
				if err != nil {
					out <- &NodeOption{Err: err}
176 177
					return
				}
Jeromy's avatar
Jeromy committed
178

179
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
180 181
				count++

182
			case <-ctx.Done():
183
				out <- &NodeOption{Err: ctx.Err()}
184 185 186 187
				return
			}
		}
	}()
188
	return out
189 190
}

191
// GetDAG will fill out all of the links of the given Node.
192 193
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
194
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
195
	var cids []*cid.Cid
196 197
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
198 199
	}

Jeromy's avatar
Jeromy committed
200
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
201 202
}

Jeromy's avatar
Jeromy committed
203 204
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
205
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
206 207 208 209 210 211

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
212
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
213
	for i := range keys {
214
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
215 216
	}

217
	dedupedKeys := dedupeKeys(keys)
218
	go func() {
219 220 221
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

222
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
223

224
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
225
			select {
226
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
227
				if !ok {
Jeromy's avatar
Jeromy committed
228 229 230
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
231 232
					return
				}
Jeromy's avatar
Jeromy committed
233

234
				if opt.Err != nil {
235 236 237
					for _, p := range promises {
						p.Fail(opt.Err)
					}
238 239 240 241
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
242
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
243
				for _, i := range is {
244
					count++
245
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
246 247 248
				}
			case <-ctx.Done():
				return
249 250 251
			}
		}
	}()
Jeromy's avatar
Jeromy committed
252 253 254
	return promises
}

255
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
256 257 258 259
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
260
	}
Jeromy's avatar
Jeromy committed
261
	return set.Keys()
262 263
}

264
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
265
	return &nodePromise{
Jeromy's avatar
Jeromy committed
266
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
267
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
268
		err:  make(chan error, 1),
269
	}
Jeromy's avatar
Jeromy committed
270 271 272
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
273
	cache node.Node
274
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
275
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
276
	ctx   context.Context
Jeromy's avatar
Jeromy committed
277
	err   chan error
Jeromy's avatar
Jeromy committed
278 279
}

Jeromy's avatar
Jeromy committed
280 281 282 283
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
284
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
285
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
286
	Fail(err error)
Jeromy's avatar
Jeromy committed
287
	Send(node.Node)
Jeromy's avatar
Jeromy committed
288 289 290
}

func (np *nodePromise) Fail(err error) {
291 292 293 294 295 296 297 298 299
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
300
	np.err <- err
Jeromy's avatar
Jeromy committed
301 302
}

Jeromy's avatar
Jeromy committed
303
func (np *nodePromise) Send(nd node.Node) {
304 305
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
306
	if np.cache != nil {
307 308 309 310 311 312 313 314 315 316 317 318
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
319
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
320 321 322 323 324
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
325 326 327
	}

	select {
328 329
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
330 331
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
332 333
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
334 335
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
336
	}
337
}
338 339 340 341

type Batch struct {
	ds *dagService

342
	blocks  []blocks.Block
343 344 345 346
	size    int
	MaxSize int
}

Jeromy's avatar
Jeromy committed
347
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
348
	t.blocks = append(t.blocks, nd)
349
	t.size += len(nd.RawData())
350
	if t.size > t.MaxSize {
Jeromy's avatar
Jeromy committed
351
		return nd.Cid(), t.Commit()
352
	}
Jeromy's avatar
Jeromy committed
353
	return nd.Cid(), nil
354 355 356
}

func (t *Batch) Commit() error {
357 358
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
359 360 361
	t.size = 0
	return err
}
362 363 364 365

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
366 367 368 369 370 371 372
func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit func(*cid.Cid) bool, bestEffort bool) error {
	links, err := ds.GetLinks(ctx, root)
	if bestEffort && err == ErrNotFound {
		return nil
	} else if err != nil {
		return err
	}
373
	for _, lnk := range links {
374
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
375
		if visit(c) {
376
			err = EnumerateChildren(ctx, ds, c, visit, bestEffort)
377 378 379 380 381 382 383
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
384

385
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
Jeromy's avatar
Jeromy committed
386
	toprocess := make(chan []*cid.Cid, 8)
387
	nodes := make(chan *NodeOption, 8)
Jeromy's avatar
Jeromy committed
388 389 390 391 392

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

393
	go fetchNodes(ctx, ds, toprocess, nodes)
Jeromy's avatar
Jeromy committed
394

395 396 397 398 399
	root, err := ds.Get(ctx, c)
	if err != nil {
		return err
	}

400
	nodes <- &NodeOption{Node: root}
Jeromy's avatar
Jeromy committed
401 402 403 404
	live := 1

	for {
		select {
405
		case opt, ok := <-nodes:
Jeromy's avatar
Jeromy committed
406 407 408
			if !ok {
				return nil
			}
409 410 411 412 413 414 415

			if opt.Err != nil {
				return opt.Err
			}

			nd := opt.Node

Jeromy's avatar
Jeromy committed
416 417 418
			// a node has been fetched
			live--

Jeromy's avatar
Jeromy committed
419
			var cids []*cid.Cid
420 421
			for _, lnk := range nd.Links() {
				c := lnk.Cid
Jeromy's avatar
Jeromy committed
422
				if visit(c) {
Jeromy's avatar
Jeromy committed
423
					live++
Jeromy's avatar
Jeromy committed
424
					cids = append(cids, c)
Jeromy's avatar
Jeromy committed
425 426 427 428 429 430 431
				}
			}

			if live == 0 {
				return nil
			}

Jeromy's avatar
Jeromy committed
432
			if len(cids) > 0 {
Jeromy's avatar
Jeromy committed
433
				select {
Jeromy's avatar
Jeromy committed
434
				case toprocess <- cids:
Jeromy's avatar
Jeromy committed
435 436 437 438 439 440 441 442 443 444
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

Jeromy's avatar
Jeromy committed
445
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out chan<- *NodeOption) {
446 447 448 449 450 451 452
	var wg sync.WaitGroup
	defer func() {
		// wait for all 'get' calls to complete so we don't accidentally send
		// on a closed channel
		wg.Wait()
		close(out)
	}()
Jeromy's avatar
Jeromy committed
453

Jeromy's avatar
Jeromy committed
454
	get := func(ks []*cid.Cid) {
455 456 457
		defer wg.Done()
		nodes := ds.GetMany(ctx, ks)
		for opt := range nodes {
Jeromy's avatar
Jeromy committed
458
			select {
459 460
			case out <- opt:
			case <-ctx.Done():
461
				return
Jeromy's avatar
Jeromy committed
462 463 464 465 466
			}
		}
	}

	for ks := range in {
467
		wg.Add(1)
468
		go get(ks)
Jeromy's avatar
Jeromy committed
469 470
	}
}