merkledag.go 10.3 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"strings"
8
	"sync"
Jeromy's avatar
Jeromy committed
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
12
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
13

Jeromy's avatar
Jeromy committed
14
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
15 16 17
	cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
	ipldcbor "gx/ipfs/QmWcQMNruWC3wphK1L6zEcV4MZBJqfsNKSRFcuo4AsNk4k/go-ipld-cbor"
	node "gx/ipfs/QmYDscK7dmdo2GZ9aumS8s5auUUAH5mR1jvj5pYhWusfK7/go-ipld-node"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("merkledag")
21
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
22

Jeromy's avatar
Jeromy committed
23 24
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
25 26 27
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
28 29 30

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
31
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
32 33

	Batch() *Batch
34 35

	LinkService
Jeromy's avatar
Jeromy committed
36 37
}

38
type LinkService interface {
39 40
	// Return all links for a node, may be more effect than
	// calling Get in DAGService
Jeromy's avatar
Jeromy committed
41
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
42 43

	GetOfflineLinkService() LinkService
44 45
}

46
func NewDAGService(bs bserv.BlockService) *dagService {
47
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
48 49
}

50
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
51 52
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
53 54
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
55
type dagService struct {
56
	Blocks bserv.BlockService
57 58
}

59
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
60
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
61
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
62
		return nil, fmt.Errorf("dagService is nil")
63 64
	}

65
	return n.Blocks.AddBlock(nd)
66 67
}

68 69 70 71
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

72
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
73
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
74
	if n == nil {
75
		return nil, fmt.Errorf("dagService is nil")
76
	}
Jeromy's avatar
Jeromy committed
77

78 79
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
80

Jeromy's avatar
Jeromy committed
81
	b, err := n.Blocks.GetBlock(ctx, c)
82
	if err != nil {
83 84 85
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
86
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
87 88
	}

89 90 91 92 93 94
	return decodeBlock(b)
}

func decodeBlock(b blocks.Block) (node.Node, error) {
	c := b.Cid()

Jeromy's avatar
Jeromy committed
95
	switch c.Type() {
Jeromy's avatar
Jeromy committed
96
	case cid.DagProtobuf:
97
		decnd, err := DecodeProtobuf(b.RawData())
Jeromy's avatar
Jeromy committed
98 99 100 101 102
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
103
		}
104 105

		decnd.cached = b.Cid()
106
		decnd.Prefix = b.Cid().Prefix()
107 108 109
		return decnd, nil
	case cid.Raw:
		return NewRawNode(b.RawData()), nil
Jeromy's avatar
Jeromy committed
110
	case cid.DagCBOR:
111
		return ipldcbor.Decode(b.RawData())
Jeromy's avatar
Jeromy committed
112
	default:
113
		return nil, fmt.Errorf("unrecognized object type: %s", c.Type())
114
	}
115
}
Jeromy's avatar
Jeromy committed
116

Jeromy's avatar
Jeromy committed
117
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
118 119 120
	if c.Type() == cid.Raw {
		return nil, nil
	}
121 122 123 124
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
125
	return node.Links(), nil
126 127
}

128
func (n *dagService) GetOfflineLinkService() LinkService {
129 130
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
131 132 133 134 135 136
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
137
func (n *dagService) Remove(nd node.Node) error {
138
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
139 140
}

141
// FetchGraph fetches all nodes that are children of the given node
142
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
143
	return EnumerateChildren(ctx, serv, c, cid.NewSet().Visit, false)
Jeromy's avatar
Jeromy committed
144
}
145

Jeromy's avatar
Jeromy committed
146 147
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
148
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
149
	var out []int
Jeromy's avatar
Jeromy committed
150 151
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
152
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
153 154
		}
	}
Jeromy's avatar
Jeromy committed
155
	return out
Jeromy's avatar
Jeromy committed
156 157
}

158
type NodeOption struct {
Jeromy's avatar
Jeromy committed
159
	Node node.Node
160 161 162
	Err  error
}

Jeromy's avatar
Jeromy committed
163
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
164
	out := make(chan *NodeOption, len(keys))
165
	blocks := ds.Blocks.GetBlocks(ctx, keys)
166 167
	var count int

168 169 170 171 172 173
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
174
					if count != len(keys) {
175
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
176
					}
177 178
					return
				}
Jeromy's avatar
Jeromy committed
179

180 181 182
				nd, err := decodeBlock(b)
				if err != nil {
					out <- &NodeOption{Err: err}
183 184
					return
				}
Jeromy's avatar
Jeromy committed
185

186
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
187 188
				count++

189
			case <-ctx.Done():
190
				out <- &NodeOption{Err: ctx.Err()}
191 192 193 194
				return
			}
		}
	}()
195
	return out
196 197
}

198
// GetDAG will fill out all of the links of the given Node.
199 200
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
201
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
202
	var cids []*cid.Cid
203 204
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
205 206
	}

Jeromy's avatar
Jeromy committed
207
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
208 209
}

Jeromy's avatar
Jeromy committed
210 211
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
212
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
213 214 215 216 217 218

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
219
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
220
	for i := range keys {
221
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
222 223
	}

224
	dedupedKeys := dedupeKeys(keys)
225
	go func() {
226 227 228
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

229
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
230

231
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
232
			select {
233
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
234
				if !ok {
Jeromy's avatar
Jeromy committed
235 236 237
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
238 239
					return
				}
Jeromy's avatar
Jeromy committed
240

241
				if opt.Err != nil {
242 243 244
					for _, p := range promises {
						p.Fail(opt.Err)
					}
245 246 247 248
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
249
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
250
				for _, i := range is {
251
					count++
252
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
253 254 255
				}
			case <-ctx.Done():
				return
256 257 258
			}
		}
	}()
Jeromy's avatar
Jeromy committed
259 260 261
	return promises
}

262
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
263 264 265 266
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
267
	}
Jeromy's avatar
Jeromy committed
268
	return set.Keys()
269 270
}

271
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
272
	return &nodePromise{
Jeromy's avatar
Jeromy committed
273
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
274
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
275
		err:  make(chan error, 1),
276
	}
Jeromy's avatar
Jeromy committed
277 278 279
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
280
	cache node.Node
281
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
282
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
283
	ctx   context.Context
Jeromy's avatar
Jeromy committed
284
	err   chan error
Jeromy's avatar
Jeromy committed
285 286
}

Jeromy's avatar
Jeromy committed
287 288 289 290
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
291
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
292
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
293
	Fail(err error)
Jeromy's avatar
Jeromy committed
294
	Send(node.Node)
Jeromy's avatar
Jeromy committed
295 296 297
}

func (np *nodePromise) Fail(err error) {
298 299 300 301 302 303 304 305 306
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
307
	np.err <- err
Jeromy's avatar
Jeromy committed
308 309
}

Jeromy's avatar
Jeromy committed
310
func (np *nodePromise) Send(nd node.Node) {
311 312
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
313
	if np.cache != nil {
314 315 316 317 318 319 320 321 322 323 324 325
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
326
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
327 328 329 330 331
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
332 333 334
	}

	select {
335 336
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
337 338
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
339 340
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
341 342
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
343
	}
344
}
345 346 347 348

type Batch struct {
	ds *dagService

349
	blocks  []blocks.Block
350 351 352 353
	size    int
	MaxSize int
}

Jeromy's avatar
Jeromy committed
354
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
355
	t.blocks = append(t.blocks, nd)
356
	t.size += len(nd.RawData())
357
	if t.size > t.MaxSize {
Jeromy's avatar
Jeromy committed
358
		return nd.Cid(), t.Commit()
359
	}
Jeromy's avatar
Jeromy committed
360
	return nd.Cid(), nil
361 362 363
}

func (t *Batch) Commit() error {
364 365
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
366 367 368
	t.size = 0
	return err
}
369 370 371 372

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
373 374 375 376 377 378 379
func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit func(*cid.Cid) bool, bestEffort bool) error {
	links, err := ds.GetLinks(ctx, root)
	if bestEffort && err == ErrNotFound {
		return nil
	} else if err != nil {
		return err
	}
380
	for _, lnk := range links {
381
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
382
		if visit(c) {
383
			err = EnumerateChildren(ctx, ds, c, visit, bestEffort)
384 385 386 387 388 389 390
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
391

392 393 394
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
395

396 397 398 399
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
	if !visit(c) {
		return nil
	}
Jeromy's avatar
Jeromy committed
400

401 402 403 404 405
	root, err := ds.Get(ctx, c)
	if err != nil {
		return err
	}

406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
	feed := make(chan node.Node)
	out := make(chan *NodeOption)
	done := make(chan struct{})

	var setlk sync.Mutex

	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
			for n := range feed {
				links := n.Links()
				cids := make([]*cid.Cid, 0, len(links))
				for _, l := range links {
					setlk.Lock()
					unseen := visit(l.Cid)
					setlk.Unlock()
					if unseen {
						cids = append(cids, l.Cid)
					}
Jeromy's avatar
Jeromy committed
424 425
				}

426 427 428 429 430 431 432
				for nopt := range ds.GetMany(ctx, cids) {
					select {
					case out <- nopt:
					case <-ctx.Done():
						return
					}
				}
Jeromy's avatar
Jeromy committed
433
				select {
434
				case done <- struct{}{}:
Jeromy's avatar
Jeromy committed
435 436 437
				case <-ctx.Done():
				}
			}
438
		}()
Jeromy's avatar
Jeromy committed
439
	}
440
	defer close(feed)
Jeromy's avatar
Jeromy committed
441

442 443 444
	send := feed
	var todobuffer []node.Node
	var inProgress int
Jeromy's avatar
Jeromy committed
445

446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
	next := root
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
		case nc := <-out:
			if nc.Err != nil {
				return nc.Err
			}
467

468 469 470 471 472
			if next == nil {
				next = nc.Node
				send = feed
			} else {
				todobuffer = append(todobuffer, nc.Node)
Jeromy's avatar
Jeromy committed
473 474
			}

475
		case <-ctx.Done():
476
			return ctx.Err()
477
		}
Jeromy's avatar
Jeromy committed
478
	}
479

Jeromy's avatar
Jeromy committed
480
}