merkledag.go 10.1 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"strings"
8
	"sync"
Jeromy's avatar
Jeromy committed
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
12
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
13

Jeromy's avatar
Jeromy committed
14
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
15
	cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

Jeromy's avatar
Jeromy committed
18
var log = logging.Logger("merkledag")
19
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
20

Jeromy's avatar
Jeromy committed
21 22
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
23 24
	Add(*Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (*Node, error)
Jeromy's avatar
Jeromy committed
25
	Remove(*Node) error
26 27 28

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
29
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
30 31

	Batch() *Batch
32 33

	LinkService
Jeromy's avatar
Jeromy committed
34 35
}

36
type LinkService interface {
37 38 39 40 41
	// Return all links for a node, may be more effect than
	// calling Get in DAGService
	GetLinks(context.Context, *cid.Cid) ([]*Link, error)

	GetOfflineLinkService() LinkService
42 43
}

44
func NewDAGService(bs bserv.BlockService) *dagService {
45
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
46 47
}

48
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
49 50
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
51 52
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
53
type dagService struct {
54
	Blocks bserv.BlockService
55 56
}

57
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
58
func (n *dagService) Add(nd *Node) (*cid.Cid, error) {
59
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
60
		return nil, fmt.Errorf("dagService is nil")
61 62
	}

63
	return n.Blocks.AddBlock(nd)
64 65
}

66 67 68 69
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

70
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
71
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) {
72
	if n == nil {
73
		return nil, fmt.Errorf("dagService is nil")
74
	}
Jeromy's avatar
Jeromy committed
75

76 77
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
78

Jeromy's avatar
Jeromy committed
79
	b, err := n.Blocks.GetBlock(ctx, c)
80
	if err != nil {
81 82 83
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
84
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
85 86
	}

Jeromy's avatar
Jeromy committed
87 88 89 90 91 92 93 94 95
	var res *Node
	switch c.Type() {
	case cid.Protobuf:
		out, err := DecodeProtobuf(b.RawData())
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
96
		}
Jeromy's avatar
Jeromy committed
97 98 99
		res = out
	default:
		return nil, fmt.Errorf("unrecognized formatting type")
100
	}
101

Jeromy's avatar
Jeromy committed
102
	res.cached = c
103

104
	return res, nil
105
}
Jeromy's avatar
Jeromy committed
106

107 108 109 110 111 112 113 114
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) {
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
	return node.Links, nil
}

115
func (n *dagService) GetOfflineLinkService() LinkService {
116 117
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
118 119 120 121 122 123
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
124
func (n *dagService) Remove(nd *Node) error {
125
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
126 127
}

128
// FetchGraph fetches all nodes that are children of the given node
129 130
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
	return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
Jeromy's avatar
Jeromy committed
131
}
132

Jeromy's avatar
Jeromy committed
133 134
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
135
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
136
	var out []int
Jeromy's avatar
Jeromy committed
137 138
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
139
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
140 141
		}
	}
Jeromy's avatar
Jeromy committed
142
	return out
Jeromy's avatar
Jeromy committed
143 144
}

145 146 147 148 149
type NodeOption struct {
	Node *Node
	Err  error
}

Jeromy's avatar
Jeromy committed
150
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
151
	out := make(chan *NodeOption, len(keys))
152
	blocks := ds.Blocks.GetBlocks(ctx, keys)
153 154
	var count int

155 156 157 158 159 160
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
161
					if count != len(keys) {
162
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
163
					}
164 165
					return
				}
Jeromy's avatar
Jeromy committed
166

167
				c := b.Cid()
Jeromy's avatar
Jeromy committed
168 169 170 171 172 173 174 175 176 177 178 179 180

				var nd *Node
				switch c.Type() {
				case cid.Protobuf:
					decnd, err := DecodeProtobuf(b.RawData())
					if err != nil {
						out <- &NodeOption{Err: err}
						return
					}
					decnd.cached = cid.NewCidV0(b.Multihash())
					nd = decnd
				default:
					out <- &NodeOption{Err: fmt.Errorf("unrecognized object type: %s", c.Type())}
181 182
					return
				}
Jeromy's avatar
Jeromy committed
183 184

				// buffered, no need to select
185
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
186 187
				count++

188
			case <-ctx.Done():
189
				out <- &NodeOption{Err: ctx.Err()}
190 191 192 193
				return
			}
		}
	}()
194
	return out
195 196
}

197
// GetDAG will fill out all of the links of the given Node.
198 199
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
200
func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
201
	var cids []*cid.Cid
Jeromy's avatar
Jeromy committed
202
	for _, lnk := range root.Links {
Jeromy's avatar
Jeromy committed
203
		cids = append(cids, cid.NewCidV0(lnk.Hash))
Jeromy's avatar
Jeromy committed
204 205
	}

Jeromy's avatar
Jeromy committed
206
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
207 208
}

Jeromy's avatar
Jeromy committed
209 210
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
211
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
212 213 214 215 216 217

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
218
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
219
	for i := range keys {
220
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
221 222
	}

223
	dedupedKeys := dedupeKeys(keys)
224
	go func() {
225 226 227
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

228
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
229

230
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
231
			select {
232
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
233
				if !ok {
Jeromy's avatar
Jeromy committed
234 235 236
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
237 238
					return
				}
Jeromy's avatar
Jeromy committed
239

240
				if opt.Err != nil {
241 242 243
					for _, p := range promises {
						p.Fail(opt.Err)
					}
244 245 246 247
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
248
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
249
				for _, i := range is {
250
					count++
251
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
252 253 254
				}
			case <-ctx.Done():
				return
255 256 257
			}
		}
	}()
Jeromy's avatar
Jeromy committed
258 259 260
	return promises
}

261
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
262 263 264 265
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
266
	}
Jeromy's avatar
Jeromy committed
267
	return set.Keys()
268 269
}

270
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
271
	return &nodePromise{
272
		recv: make(chan *Node, 1),
Jeromy's avatar
Jeromy committed
273
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
274
		err:  make(chan error, 1),
275
	}
Jeromy's avatar
Jeromy committed
276 277 278 279
}

type nodePromise struct {
	cache *Node
280 281
	clk   sync.Mutex
	recv  chan *Node
Jeromy's avatar
Jeromy committed
282
	ctx   context.Context
Jeromy's avatar
Jeromy committed
283
	err   chan error
Jeromy's avatar
Jeromy committed
284 285
}

Jeromy's avatar
Jeromy committed
286 287 288 289
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
290
type NodeGetter interface {
291
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
292
	Fail(err error)
293
	Send(*Node)
Jeromy's avatar
Jeromy committed
294 295 296
}

func (np *nodePromise) Fail(err error) {
297 298 299 300 301 302 303 304 305
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
306
	np.err <- err
Jeromy's avatar
Jeromy committed
307 308
}

309 310 311
func (np *nodePromise) Send(nd *Node) {
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
312
	if np.cache != nil {
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
331 332 333
	}

	select {
334 335
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
336 337
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
338 339
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
340 341
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
342
	}
343
}
344 345 346 347

type Batch struct {
	ds *dagService

348
	blocks  []blocks.Block
349 350 351 352
	size    int
	MaxSize int
}

Jeromy's avatar
Jeromy committed
353
func (t *Batch) Add(nd *Node) (*cid.Cid, error) {
354
	d, err := nd.EncodeProtobuf(false)
355
	if err != nil {
Jeromy's avatar
Jeromy committed
356
		return nil, err
357 358
	}

359
	t.blocks = append(t.blocks, nd)
Jeromy's avatar
Jeromy committed
360
	t.size += len(d)
361
	if t.size > t.MaxSize {
Jeromy's avatar
Jeromy committed
362
		return nd.Cid(), t.Commit()
363
	}
Jeromy's avatar
Jeromy committed
364
	return nd.Cid(), nil
365 366 367
}

func (t *Batch) Commit() error {
368 369
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
370 371 372
	t.size = 0
	return err
}
373

Jeromy's avatar
Jeromy committed
374 375 376 377
func legacyCidFromLink(lnk *Link) *cid.Cid {
	return cid.NewCidV0(lnk.Hash)
}

378 379 380
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
381 382 383 384 385 386 387
func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit func(*cid.Cid) bool, bestEffort bool) error {
	links, err := ds.GetLinks(ctx, root)
	if bestEffort && err == ErrNotFound {
		return nil
	} else if err != nil {
		return err
	}
388
	for _, lnk := range links {
Jeromy's avatar
Jeromy committed
389 390
		c := legacyCidFromLink(lnk)
		if visit(c) {
391
			err = EnumerateChildren(ctx, ds, c, visit, bestEffort)
392 393 394 395 396 397 398
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
399

400
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
Jeromy's avatar
Jeromy committed
401
	toprocess := make(chan []*cid.Cid, 8)
402
	nodes := make(chan *NodeOption, 8)
Jeromy's avatar
Jeromy committed
403 404 405 406 407

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

408
	go fetchNodes(ctx, ds, toprocess, nodes)
Jeromy's avatar
Jeromy committed
409

410 411 412 413 414
	root, err := ds.Get(ctx, c)
	if err != nil {
		return err
	}

415
	nodes <- &NodeOption{Node: root}
Jeromy's avatar
Jeromy committed
416 417 418 419
	live := 1

	for {
		select {
420
		case opt, ok := <-nodes:
Jeromy's avatar
Jeromy committed
421 422 423
			if !ok {
				return nil
			}
424 425 426 427 428 429 430

			if opt.Err != nil {
				return opt.Err
			}

			nd := opt.Node

Jeromy's avatar
Jeromy committed
431 432 433
			// a node has been fetched
			live--

Jeromy's avatar
Jeromy committed
434
			var cids []*cid.Cid
Jeromy's avatar
Jeromy committed
435
			for _, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
436 437
				c := legacyCidFromLink(lnk)
				if visit(c) {
Jeromy's avatar
Jeromy committed
438
					live++
Jeromy's avatar
Jeromy committed
439
					cids = append(cids, c)
Jeromy's avatar
Jeromy committed
440 441 442 443 444 445 446
				}
			}

			if live == 0 {
				return nil
			}

Jeromy's avatar
Jeromy committed
447
			if len(cids) > 0 {
Jeromy's avatar
Jeromy committed
448
				select {
Jeromy's avatar
Jeromy committed
449
				case toprocess <- cids:
Jeromy's avatar
Jeromy committed
450 451 452 453 454 455 456 457 458 459
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

Jeromy's avatar
Jeromy committed
460
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out chan<- *NodeOption) {
461 462 463 464 465 466 467
	var wg sync.WaitGroup
	defer func() {
		// wait for all 'get' calls to complete so we don't accidentally send
		// on a closed channel
		wg.Wait()
		close(out)
	}()
Jeromy's avatar
Jeromy committed
468

Jeromy's avatar
Jeromy committed
469
	get := func(ks []*cid.Cid) {
470 471 472
		defer wg.Done()
		nodes := ds.GetMany(ctx, ks)
		for opt := range nodes {
Jeromy's avatar
Jeromy committed
473
			select {
474 475
			case out <- opt:
			case <-ctx.Done():
476
				return
Jeromy's avatar
Jeromy committed
477 478 479 480 481
			}
		}
	}

	for ks := range in {
482
		wg.Add(1)
483
		go get(ks)
Jeromy's avatar
Jeromy committed
484 485
	}
}