merkledag.go 10.8 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
6
	"strings"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

9
	bserv "github.com/ipfs/go-ipfs/blockservice"
10
	offline "github.com/ipfs/go-ipfs/exchange/offline"
11
	key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
Jeromy's avatar
Jeromy committed
12

13
	"context"
Jeromy's avatar
Jeromy committed
14
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
15
	cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

Jeromy's avatar
Jeromy committed
18
var log = logging.Logger("merkledag")
19
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
20

Jeromy's avatar
Jeromy committed
21 22
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
23 24
	Add(*Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (*Node, error)
Jeromy's avatar
Jeromy committed
25
	Remove(*Node) error
26 27 28

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
29
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
30 31

	Batch() *Batch
32 33

	LinkService
Jeromy's avatar
Jeromy committed
34 35
}

36
type LinkService interface {
37 38 39 40 41
	// Return all links for a node, may be more effect than
	// calling Get in DAGService
	GetLinks(context.Context, *cid.Cid) ([]*Link, error)

	GetOfflineLinkService() LinkService
42 43 44 45
}

func NewDAGService(bs *bserv.BlockService) *dagService {
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
46 47
}

48
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
49 50
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
51 52
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
53
type dagService struct {
54
	Blocks *bserv.BlockService
55 56
}

57
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
58
func (n *dagService) Add(nd *Node) (*cid.Cid, error) {
59
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
60
		return nil, fmt.Errorf("dagService is nil")
61 62
	}

Jeromy's avatar
Jeromy committed
63
	return n.Blocks.AddObject(nd)
64 65
}

66 67 68 69
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

70
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
71
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) {
72
	if n == nil {
73
		return nil, fmt.Errorf("dagService is nil")
74
	}
Jeromy's avatar
Jeromy committed
75

76 77
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
78

Jeromy's avatar
Jeromy committed
79
	b, err := n.Blocks.GetBlock(ctx, c)
80
	if err != nil {
81 82 83
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
84
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
85 86
	}

Jeromy's avatar
Jeromy committed
87 88 89 90 91 92 93 94 95
	var res *Node
	switch c.Type() {
	case cid.Protobuf:
		out, err := DecodeProtobuf(b.RawData())
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
96
		}
Jeromy's avatar
Jeromy committed
97 98 99
		res = out
	default:
		return nil, fmt.Errorf("unrecognized formatting type")
100
	}
101

Jeromy's avatar
Jeromy committed
102
	res.cached = c
103

104
	return res, nil
105
}
Jeromy's avatar
Jeromy committed
106

107 108 109 110 111 112 113 114
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) {
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
	return node.Links, nil
}

115 116 117 118 119 120 121 122 123
func (n *dagService) GetOfflineLinkService() LinkService {
	if n.Blocks.Exchange.IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore, offline.Exchange(n.Blocks.Blockstore))
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
124
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
125
	return n.Blocks.DeleteObject(nd)
Jeromy's avatar
Jeromy committed
126 127
}

128
// FetchGraph fetches all nodes that are children of the given node
129 130
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
	return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
Jeromy's avatar
Jeromy committed
131
}
132

Jeromy's avatar
Jeromy committed
133 134
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
135
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
136
	var out []int
Jeromy's avatar
Jeromy committed
137 138
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
139
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
140 141
		}
	}
Jeromy's avatar
Jeromy committed
142
	return out
Jeromy's avatar
Jeromy committed
143 144
}

145 146 147 148 149
type NodeOption struct {
	Node *Node
	Err  error
}

Jeromy's avatar
Jeromy committed
150 151 152 153 154 155
// TODO: this is a mid-term hack to get around the fact that blocks don't
// have full CIDs and potentially (though we don't know of any such scenario)
// may have the same block with multiple different encodings.
// We have discussed the possiblity of using CIDs as datastore keys
// in the future. This would be a much larger changeset than i want to make
// right now.
Jeromy's avatar
Jeromy committed
156 157 158 159 160 161 162 163 164
func cidsToKeyMapping(cids []*cid.Cid) map[key.Key]*cid.Cid {
	mapping := make(map[key.Key]*cid.Cid)
	for _, c := range cids {
		mapping[key.Key(c.Hash())] = c
	}
	return mapping
}

func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
165
	out := make(chan *NodeOption, len(keys))
166
	blocks := ds.Blocks.GetBlocks(ctx, keys)
167 168
	var count int

Jeromy's avatar
Jeromy committed
169 170
	mapping := cidsToKeyMapping(keys)

171 172 173 174 175 176
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
177
					if count != len(keys) {
178
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
179
					}
180 181
					return
				}
Jeromy's avatar
Jeromy committed
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196

				c := mapping[b.Key()]

				var nd *Node
				switch c.Type() {
				case cid.Protobuf:
					decnd, err := DecodeProtobuf(b.RawData())
					if err != nil {
						out <- &NodeOption{Err: err}
						return
					}
					decnd.cached = cid.NewCidV0(b.Multihash())
					nd = decnd
				default:
					out <- &NodeOption{Err: fmt.Errorf("unrecognized object type: %s", c.Type())}
197 198
					return
				}
Jeromy's avatar
Jeromy committed
199 200

				// buffered, no need to select
201
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
202 203
				count++

204
			case <-ctx.Done():
205
				out <- &NodeOption{Err: ctx.Err()}
206 207 208 209
				return
			}
		}
	}()
210
	return out
211 212
}

213
// GetDAG will fill out all of the links of the given Node.
214 215
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
216
func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
217
	var cids []*cid.Cid
Jeromy's avatar
Jeromy committed
218
	for _, lnk := range root.Links {
Jeromy's avatar
Jeromy committed
219
		cids = append(cids, cid.NewCidV0(lnk.Hash))
Jeromy's avatar
Jeromy committed
220 221
	}

Jeromy's avatar
Jeromy committed
222
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
223 224
}

Jeromy's avatar
Jeromy committed
225 226
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
227
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
228 229 230 231 232 233

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
234
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
235
	for i := range keys {
236
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
237 238
	}

239
	dedupedKeys := dedupeKeys(keys)
240
	go func() {
241 242 243
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

244
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
245

246
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
247
			select {
248
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
249
				if !ok {
Jeromy's avatar
Jeromy committed
250 251 252
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
253 254
					return
				}
Jeromy's avatar
Jeromy committed
255

256
				if opt.Err != nil {
257 258 259
					for _, p := range promises {
						p.Fail(opt.Err)
					}
260 261 262 263
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
264
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
265
				for _, i := range is {
266
					count++
267
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
268 269 270
				}
			case <-ctx.Done():
				return
271 272 273
			}
		}
	}()
Jeromy's avatar
Jeromy committed
274 275 276
	return promises
}

277
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
278 279 280 281
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
282
	}
Jeromy's avatar
Jeromy committed
283
	return set.Keys()
284 285
}

286
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
287
	return &nodePromise{
288
		recv: make(chan *Node, 1),
Jeromy's avatar
Jeromy committed
289
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
290
		err:  make(chan error, 1),
291
	}
Jeromy's avatar
Jeromy committed
292 293 294 295
}

type nodePromise struct {
	cache *Node
296 297
	clk   sync.Mutex
	recv  chan *Node
Jeromy's avatar
Jeromy committed
298
	ctx   context.Context
Jeromy's avatar
Jeromy committed
299
	err   chan error
Jeromy's avatar
Jeromy committed
300 301
}

Jeromy's avatar
Jeromy committed
302 303 304 305
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
306
type NodeGetter interface {
307
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
308
	Fail(err error)
309
	Send(*Node)
Jeromy's avatar
Jeromy committed
310 311 312
}

func (np *nodePromise) Fail(err error) {
313 314 315 316 317 318 319 320 321
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
322
	np.err <- err
Jeromy's avatar
Jeromy committed
323 324
}

325 326 327
func (np *nodePromise) Send(nd *Node) {
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
328
	if np.cache != nil {
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
347 348 349
	}

	select {
350 351
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
352 353
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
354 355
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
356 357
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
358
	}
359
}
360 361 362 363

type Batch struct {
	ds *dagService

Jeromy's avatar
Jeromy committed
364
	objects []bserv.Object
365 366 367 368
	size    int
	MaxSize int
}

Jeromy's avatar
Jeromy committed
369
func (t *Batch) Add(nd *Node) (*cid.Cid, error) {
370
	d, err := nd.EncodeProtobuf(false)
371
	if err != nil {
Jeromy's avatar
Jeromy committed
372
		return nil, err
373 374
	}

Jeromy's avatar
Jeromy committed
375 376
	t.objects = append(t.objects, nd)
	t.size += len(d)
377
	if t.size > t.MaxSize {
Jeromy's avatar
Jeromy committed
378
		return nd.Cid(), t.Commit()
379
	}
Jeromy's avatar
Jeromy committed
380
	return nd.Cid(), nil
381 382 383
}

func (t *Batch) Commit() error {
Jeromy's avatar
Jeromy committed
384 385
	_, err := t.ds.Blocks.AddObjects(t.objects)
	t.objects = nil
386 387 388
	t.size = 0
	return err
}
389

Jeromy's avatar
Jeromy committed
390 391 392 393
func legacyCidFromLink(lnk *Link) *cid.Cid {
	return cid.NewCidV0(lnk.Hash)
}

394 395 396
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
397 398 399 400 401 402 403
func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit func(*cid.Cid) bool, bestEffort bool) error {
	links, err := ds.GetLinks(ctx, root)
	if bestEffort && err == ErrNotFound {
		return nil
	} else if err != nil {
		return err
	}
404
	for _, lnk := range links {
Jeromy's avatar
Jeromy committed
405 406
		c := legacyCidFromLink(lnk)
		if visit(c) {
407
			err = EnumerateChildren(ctx, ds, c, visit, bestEffort)
408 409 410 411 412 413 414
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
415

416
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
Jeromy's avatar
Jeromy committed
417
	toprocess := make(chan []*cid.Cid, 8)
418
	nodes := make(chan *NodeOption, 8)
Jeromy's avatar
Jeromy committed
419 420 421 422 423

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

424
	go fetchNodes(ctx, ds, toprocess, nodes)
Jeromy's avatar
Jeromy committed
425

426 427 428 429 430
	root, err := ds.Get(ctx, c)
	if err != nil {
		return err
	}

431
	nodes <- &NodeOption{Node: root}
Jeromy's avatar
Jeromy committed
432 433 434 435
	live := 1

	for {
		select {
436
		case opt, ok := <-nodes:
Jeromy's avatar
Jeromy committed
437 438 439
			if !ok {
				return nil
			}
440 441 442 443 444 445 446

			if opt.Err != nil {
				return opt.Err
			}

			nd := opt.Node

Jeromy's avatar
Jeromy committed
447 448 449
			// a node has been fetched
			live--

Jeromy's avatar
Jeromy committed
450
			var cids []*cid.Cid
Jeromy's avatar
Jeromy committed
451
			for _, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
452 453
				c := legacyCidFromLink(lnk)
				if visit(c) {
Jeromy's avatar
Jeromy committed
454
					live++
Jeromy's avatar
Jeromy committed
455
					cids = append(cids, c)
Jeromy's avatar
Jeromy committed
456 457 458 459 460 461 462
				}
			}

			if live == 0 {
				return nil
			}

Jeromy's avatar
Jeromy committed
463
			if len(cids) > 0 {
Jeromy's avatar
Jeromy committed
464
				select {
Jeromy's avatar
Jeromy committed
465
				case toprocess <- cids:
Jeromy's avatar
Jeromy committed
466 467 468 469 470 471 472 473 474 475
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

Jeromy's avatar
Jeromy committed
476
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out chan<- *NodeOption) {
477 478 479 480 481 482 483
	var wg sync.WaitGroup
	defer func() {
		// wait for all 'get' calls to complete so we don't accidentally send
		// on a closed channel
		wg.Wait()
		close(out)
	}()
Jeromy's avatar
Jeromy committed
484

Jeromy's avatar
Jeromy committed
485
	get := func(ks []*cid.Cid) {
486 487 488
		defer wg.Done()
		nodes := ds.GetMany(ctx, ks)
		for opt := range nodes {
Jeromy's avatar
Jeromy committed
489
			select {
490 491
			case out <- opt:
			case <-ctx.Done():
492
				return
Jeromy's avatar
Jeromy committed
493 494 495 496 497
			}
		}
	}

	for ks := range in {
498
		wg.Add(1)
499
		go get(ks)
Jeromy's avatar
Jeromy committed
500 501
	}
}