merkledag.go 10.7 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
6
	"strings"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

9
	bserv "github.com/ipfs/go-ipfs/blockservice"
10
	key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
Jeromy's avatar
Jeromy committed
11

12
	"context"
Jeromy's avatar
Jeromy committed
13
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
14
	cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

Jeromy's avatar
Jeromy committed
17
var log = logging.Logger("merkledag")
18
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
19

Jeromy's avatar
Jeromy committed
20 21
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
22 23
	Add(*Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (*Node, error)
Jeromy's avatar
Jeromy committed
24
	Remove(*Node) error
25

26 27 28 29
	// Return all links for a node, may be more effect than
	// calling Get
	GetLinks(context.Context, *cid.Cid) ([]*Link, error)

30 31
	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
32
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
33 34

	Batch() *Batch
Jeromy's avatar
Jeromy committed
35 36
}

37 38 39 40 41 42 43 44
// A LinkService returns the links for a node if they are available
// locally without having to retrieve the block from the datastore.
type LinkService interface {
	Get(*cid.Cid) ([]*Link, error)
}

func NewDAGService(bs *bserv.BlockService) *dagService {
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
45 46
}

47
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
48 49
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
50 51
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
52
type dagService struct {
53 54
	Blocks      *bserv.BlockService
	LinkService LinkService
55 56
}

57
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
58
func (n *dagService) Add(nd *Node) (*cid.Cid, error) {
59
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
60
		return nil, fmt.Errorf("dagService is nil")
61 62
	}

Jeromy's avatar
Jeromy committed
63
	return n.Blocks.AddObject(nd)
64 65
}

66 67 68 69
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

70
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
71
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) {
72
	if n == nil {
73
		return nil, fmt.Errorf("dagService is nil")
74
	}
Jeromy's avatar
Jeromy committed
75

76 77
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
78

Jeromy's avatar
Jeromy committed
79
	b, err := n.Blocks.GetBlock(ctx, c)
80
	if err != nil {
81 82 83
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
84
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
85 86
	}

Jeromy's avatar
Jeromy committed
87 88 89 90 91 92 93 94 95
	var res *Node
	switch c.Type() {
	case cid.Protobuf:
		out, err := DecodeProtobuf(b.RawData())
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
96
		}
Jeromy's avatar
Jeromy committed
97 98 99
		res = out
	default:
		return nil, fmt.Errorf("unrecognized formatting type")
100
	}
101

Jeromy's avatar
Jeromy committed
102
	res.cached = c
103

104
	return res, nil
105
}
Jeromy's avatar
Jeromy committed
106

107 108 109 110 111 112 113 114 115 116 117 118 119 120
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) {
	if n.LinkService != nil {
		links, err := n.LinkService.Get(c)
		if err == nil {
			return links, nil
		}
	}
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
	return node.Links, nil
}

Jeromy's avatar
Jeromy committed
121
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
122
	return n.Blocks.DeleteObject(nd)
Jeromy's avatar
Jeromy committed
123 124
}

125 126
// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
Jeromy's avatar
Jeromy committed
127
	return EnumerateChildrenAsync(ctx, serv, root, cid.NewSet().Visit)
Jeromy's avatar
Jeromy committed
128
}
129

Jeromy's avatar
Jeromy committed
130 131
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
132
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
133
	var out []int
Jeromy's avatar
Jeromy committed
134 135
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
136
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
137 138
		}
	}
Jeromy's avatar
Jeromy committed
139
	return out
Jeromy's avatar
Jeromy committed
140 141
}

142 143 144 145 146
type NodeOption struct {
	Node *Node
	Err  error
}

Jeromy's avatar
Jeromy committed
147 148 149 150 151 152
// TODO: this is a mid-term hack to get around the fact that blocks don't
// have full CIDs and potentially (though we don't know of any such scenario)
// may have the same block with multiple different encodings.
// We have discussed the possiblity of using CIDs as datastore keys
// in the future. This would be a much larger changeset than i want to make
// right now.
Jeromy's avatar
Jeromy committed
153 154 155 156 157 158 159 160 161
func cidsToKeyMapping(cids []*cid.Cid) map[key.Key]*cid.Cid {
	mapping := make(map[key.Key]*cid.Cid)
	for _, c := range cids {
		mapping[key.Key(c.Hash())] = c
	}
	return mapping
}

func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
162
	out := make(chan *NodeOption, len(keys))
163
	blocks := ds.Blocks.GetBlocks(ctx, keys)
164 165
	var count int

Jeromy's avatar
Jeromy committed
166 167
	mapping := cidsToKeyMapping(keys)

168 169 170 171 172 173
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
174
					if count != len(keys) {
175
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
176
					}
177 178
					return
				}
Jeromy's avatar
Jeromy committed
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193

				c := mapping[b.Key()]

				var nd *Node
				switch c.Type() {
				case cid.Protobuf:
					decnd, err := DecodeProtobuf(b.RawData())
					if err != nil {
						out <- &NodeOption{Err: err}
						return
					}
					decnd.cached = cid.NewCidV0(b.Multihash())
					nd = decnd
				default:
					out <- &NodeOption{Err: fmt.Errorf("unrecognized object type: %s", c.Type())}
194 195
					return
				}
Jeromy's avatar
Jeromy committed
196 197

				// buffered, no need to select
198
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
199 200
				count++

201
			case <-ctx.Done():
202
				out <- &NodeOption{Err: ctx.Err()}
203 204 205 206
				return
			}
		}
	}()
207
	return out
208 209
}

210
// GetDAG will fill out all of the links of the given Node.
211 212
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
213
func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
214
	var cids []*cid.Cid
Jeromy's avatar
Jeromy committed
215
	for _, lnk := range root.Links {
Jeromy's avatar
Jeromy committed
216
		cids = append(cids, cid.NewCidV0(lnk.Hash))
Jeromy's avatar
Jeromy committed
217 218
	}

Jeromy's avatar
Jeromy committed
219
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
220 221
}

Jeromy's avatar
Jeromy committed
222 223
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
224
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
225 226 227 228 229 230

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
231
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
232
	for i := range keys {
233
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
234 235
	}

236
	dedupedKeys := dedupeKeys(keys)
237
	go func() {
238 239 240
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

241
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
242

243
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
244
			select {
245
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
246
				if !ok {
Jeromy's avatar
Jeromy committed
247 248 249
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
250 251
					return
				}
Jeromy's avatar
Jeromy committed
252

253
				if opt.Err != nil {
254 255 256
					for _, p := range promises {
						p.Fail(opt.Err)
					}
257 258 259 260
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
261
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
262
				for _, i := range is {
263
					count++
264
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
265 266 267
				}
			case <-ctx.Done():
				return
268 269 270
			}
		}
	}()
Jeromy's avatar
Jeromy committed
271 272 273
	return promises
}

274
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
275 276 277 278
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
279
	}
Jeromy's avatar
Jeromy committed
280
	return set.Keys()
281 282
}

283
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
284
	return &nodePromise{
285
		recv: make(chan *Node, 1),
Jeromy's avatar
Jeromy committed
286
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
287
		err:  make(chan error, 1),
288
	}
Jeromy's avatar
Jeromy committed
289 290 291 292
}

type nodePromise struct {
	cache *Node
293 294
	clk   sync.Mutex
	recv  chan *Node
Jeromy's avatar
Jeromy committed
295
	ctx   context.Context
Jeromy's avatar
Jeromy committed
296
	err   chan error
Jeromy's avatar
Jeromy committed
297 298
}

Jeromy's avatar
Jeromy committed
299 300 301 302
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
303
type NodeGetter interface {
304
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
305
	Fail(err error)
306
	Send(*Node)
Jeromy's avatar
Jeromy committed
307 308 309
}

func (np *nodePromise) Fail(err error) {
310 311 312 313 314 315 316 317 318
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
319
	np.err <- err
Jeromy's avatar
Jeromy committed
320 321
}

322 323 324
func (np *nodePromise) Send(nd *Node) {
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
325
	if np.cache != nil {
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
344 345 346
	}

	select {
347 348
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
349 350
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
351 352
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
353 354
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
355
	}
356
}
357 358 359 360

type Batch struct {
	ds *dagService

Jeromy's avatar
Jeromy committed
361
	objects []bserv.Object
362 363 364 365
	size    int
	MaxSize int
}

Jeromy's avatar
Jeromy committed
366
func (t *Batch) Add(nd *Node) (*cid.Cid, error) {
367
	d, err := nd.EncodeProtobuf(false)
368
	if err != nil {
Jeromy's avatar
Jeromy committed
369
		return nil, err
370 371
	}

Jeromy's avatar
Jeromy committed
372 373
	t.objects = append(t.objects, nd)
	t.size += len(d)
374
	if t.size > t.MaxSize {
Jeromy's avatar
Jeromy committed
375
		return nd.Cid(), t.Commit()
376
	}
Jeromy's avatar
Jeromy committed
377
	return nd.Cid(), nil
378 379 380
}

func (t *Batch) Commit() error {
Jeromy's avatar
Jeromy committed
381 382
	_, err := t.ds.Blocks.AddObjects(t.objects)
	t.objects = nil
383 384 385
	t.size = 0
	return err
}
386

Jeromy's avatar
Jeromy committed
387 388 389 390
func legacyCidFromLink(lnk *Link) *cid.Cid {
	return cid.NewCidV0(lnk.Hash)
}

391 392 393
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
394 395
func EnumerateChildren(ctx context.Context, ds DAGService, links []*Link, visit func(*cid.Cid) bool, bestEffort bool) error {
	for _, lnk := range links {
Jeromy's avatar
Jeromy committed
396 397
		c := legacyCidFromLink(lnk)
		if visit(c) {
398
			children, err := ds.GetLinks(ctx, c)
399
			if err != nil {
400 401 402 403 404
				if bestEffort && err == ErrNotFound {
					continue
				} else {
					return err
				}
405
			}
406
			err = EnumerateChildren(ctx, ds, children, visit, bestEffort)
407 408 409 410 411 412 413
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
414

Jeromy's avatar
Jeromy committed
415 416
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, visit func(*cid.Cid) bool) error {
	toprocess := make(chan []*cid.Cid, 8)
417
	nodes := make(chan *NodeOption, 8)
Jeromy's avatar
Jeromy committed
418 419 420 421 422

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

423
	go fetchNodes(ctx, ds, toprocess, nodes)
Jeromy's avatar
Jeromy committed
424

425
	nodes <- &NodeOption{Node: root}
Jeromy's avatar
Jeromy committed
426 427 428 429
	live := 1

	for {
		select {
430
		case opt, ok := <-nodes:
Jeromy's avatar
Jeromy committed
431 432 433
			if !ok {
				return nil
			}
434 435 436 437 438 439 440

			if opt.Err != nil {
				return opt.Err
			}

			nd := opt.Node

Jeromy's avatar
Jeromy committed
441 442 443
			// a node has been fetched
			live--

Jeromy's avatar
Jeromy committed
444
			var cids []*cid.Cid
Jeromy's avatar
Jeromy committed
445
			for _, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
446 447
				c := legacyCidFromLink(lnk)
				if visit(c) {
Jeromy's avatar
Jeromy committed
448
					live++
Jeromy's avatar
Jeromy committed
449
					cids = append(cids, c)
Jeromy's avatar
Jeromy committed
450 451 452 453 454 455 456
				}
			}

			if live == 0 {
				return nil
			}

Jeromy's avatar
Jeromy committed
457
			if len(cids) > 0 {
Jeromy's avatar
Jeromy committed
458
				select {
Jeromy's avatar
Jeromy committed
459
				case toprocess <- cids:
Jeromy's avatar
Jeromy committed
460 461 462 463 464 465 466 467 468 469
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

Jeromy's avatar
Jeromy committed
470
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out chan<- *NodeOption) {
471 472 473 474 475 476 477
	var wg sync.WaitGroup
	defer func() {
		// wait for all 'get' calls to complete so we don't accidentally send
		// on a closed channel
		wg.Wait()
		close(out)
	}()
Jeromy's avatar
Jeromy committed
478

Jeromy's avatar
Jeromy committed
479
	get := func(ks []*cid.Cid) {
480 481 482
		defer wg.Done()
		nodes := ds.GetMany(ctx, ks)
		for opt := range nodes {
Jeromy's avatar
Jeromy committed
483
			select {
484 485
			case out <- opt:
			case <-ctx.Done():
486
				return
Jeromy's avatar
Jeromy committed
487 488 489 490 491
			}
		}
	}

	for ks := range in {
492
		wg.Add(1)
493
		go get(ks)
Jeromy's avatar
Jeromy committed
494 495
	}
}