merkledag.go 10.5 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"strings"
8
	"sync"
Jeromy's avatar
Jeromy committed
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
12
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
13

Jeromy's avatar
Jeromy committed
14
	node "gx/ipfs/QmRSU5EqqWVZSNdbU51yXmVoF1uNw3JgTNB6RaiL7DZM16/go-ipld-node"
Jeromy's avatar
Jeromy committed
15
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
Jeromy's avatar
Jeromy committed
16 17
	ipldcbor "gx/ipfs/QmbuuwTd9x4NReZ7sxtiKk7wFcfDUo54MfWBdtF5MRCPGR/go-ipld-cbor"
	cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("merkledag")
21
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
22

Jeromy's avatar
Jeromy committed
23 24
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
25 26 27
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
28 29 30

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
31
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
32 33

	Batch() *Batch
34 35

	LinkService
Jeromy's avatar
Jeromy committed
36 37
}

38
type LinkService interface {
39 40
	// Return all links for a node, may be more effect than
	// calling Get in DAGService
Jeromy's avatar
Jeromy committed
41
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
42 43

	GetOfflineLinkService() LinkService
44 45
}

46
func NewDAGService(bs bserv.BlockService) *dagService {
47
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
48 49
}

50
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
51 52
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
53 54
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
55
type dagService struct {
56
	Blocks bserv.BlockService
57 58
}

59
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
60
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
61
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
62
		return nil, fmt.Errorf("dagService is nil")
63 64
	}

65
	return n.Blocks.AddBlock(nd)
66 67
}

68 69 70 71
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

72
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
73
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
74
	if n == nil {
75
		return nil, fmt.Errorf("dagService is nil")
76
	}
Jeromy's avatar
Jeromy committed
77

78 79
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
80

Jeromy's avatar
Jeromy committed
81
	b, err := n.Blocks.GetBlock(ctx, c)
82
	if err != nil {
83 84 85
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
86
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
87 88
	}

89 90 91 92 93 94
	return decodeBlock(b)
}

func decodeBlock(b blocks.Block) (node.Node, error) {
	c := b.Cid()

Jeromy's avatar
Jeromy committed
95
	switch c.Type() {
Jeromy's avatar
Jeromy committed
96
	case cid.DagProtobuf:
97
		decnd, err := DecodeProtobuf(b.RawData())
Jeromy's avatar
Jeromy committed
98 99 100 101 102
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
103
		}
104 105

		decnd.cached = b.Cid()
106
		decnd.Prefix = b.Cid().Prefix()
107 108 109
		return decnd, nil
	case cid.Raw:
		return NewRawNode(b.RawData()), nil
Jeromy's avatar
Jeromy committed
110
	case cid.DagCBOR:
111
		return ipldcbor.Decode(b.RawData())
Jeromy's avatar
Jeromy committed
112
	default:
113
		return nil, fmt.Errorf("unrecognized object type: %s", c.Type())
114
	}
115
}
Jeromy's avatar
Jeromy committed
116

Jeromy's avatar
Jeromy committed
117
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
118 119 120
	if c.Type() == cid.Raw {
		return nil, nil
	}
121 122 123 124
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
125
	return node.Links(), nil
126 127
}

128
func (n *dagService) GetOfflineLinkService() LinkService {
129 130
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
131 132 133 134 135 136
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
137
func (n *dagService) Remove(nd node.Node) error {
138
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
139 140
}

141
// FetchGraph fetches all nodes that are children of the given node
142 143
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
	return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
Jeromy's avatar
Jeromy committed
144
}
145

Jeromy's avatar
Jeromy committed
146 147
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
148
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
149
	var out []int
Jeromy's avatar
Jeromy committed
150 151
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
152
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
153 154
		}
	}
Jeromy's avatar
Jeromy committed
155
	return out
Jeromy's avatar
Jeromy committed
156 157
}

158
type NodeOption struct {
Jeromy's avatar
Jeromy committed
159
	Node node.Node
160 161 162
	Err  error
}

Jeromy's avatar
Jeromy committed
163
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
164
	out := make(chan *NodeOption, len(keys))
165
	blocks := ds.Blocks.GetBlocks(ctx, keys)
166 167
	var count int

168 169 170 171 172 173
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
174
					if count != len(keys) {
175
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
176
					}
177 178
					return
				}
Jeromy's avatar
Jeromy committed
179

180 181 182
				nd, err := decodeBlock(b)
				if err != nil {
					out <- &NodeOption{Err: err}
183 184
					return
				}
Jeromy's avatar
Jeromy committed
185

186
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
187 188
				count++

189
			case <-ctx.Done():
190
				out <- &NodeOption{Err: ctx.Err()}
191 192 193 194
				return
			}
		}
	}()
195
	return out
196 197
}

198
// GetDAG will fill out all of the links of the given Node.
199 200
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
201
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
202
	var cids []*cid.Cid
203 204
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
205 206
	}

Jeromy's avatar
Jeromy committed
207
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
208 209
}

Jeromy's avatar
Jeromy committed
210 211
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
212
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
213 214 215 216 217 218

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
219
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
220
	for i := range keys {
221
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
222 223
	}

224
	dedupedKeys := dedupeKeys(keys)
225
	go func() {
226 227 228
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

229
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
230

231
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
232
			select {
233
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
234
				if !ok {
Jeromy's avatar
Jeromy committed
235 236 237
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
238 239
					return
				}
Jeromy's avatar
Jeromy committed
240

241
				if opt.Err != nil {
242 243 244
					for _, p := range promises {
						p.Fail(opt.Err)
					}
245 246 247 248
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
249
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
250
				for _, i := range is {
251
					count++
252
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
253 254 255
				}
			case <-ctx.Done():
				return
256 257 258
			}
		}
	}()
Jeromy's avatar
Jeromy committed
259 260 261
	return promises
}

262
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
263 264 265 266
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
267
	}
Jeromy's avatar
Jeromy committed
268
	return set.Keys()
269 270
}

271
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
272
	return &nodePromise{
Jeromy's avatar
Jeromy committed
273
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
274
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
275
		err:  make(chan error, 1),
276
	}
Jeromy's avatar
Jeromy committed
277 278 279
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
280
	cache node.Node
281
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
282
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
283
	ctx   context.Context
Jeromy's avatar
Jeromy committed
284
	err   chan error
Jeromy's avatar
Jeromy committed
285 286
}

Jeromy's avatar
Jeromy committed
287 288 289 290
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
291
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
292
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
293
	Fail(err error)
Jeromy's avatar
Jeromy committed
294
	Send(node.Node)
Jeromy's avatar
Jeromy committed
295 296 297
}

func (np *nodePromise) Fail(err error) {
298 299 300 301 302 303 304 305 306
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
307
	np.err <- err
Jeromy's avatar
Jeromy committed
308 309
}

Jeromy's avatar
Jeromy committed
310
func (np *nodePromise) Send(nd node.Node) {
311 312
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
313
	if np.cache != nil {
314 315 316 317 318 319 320 321 322 323 324 325
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
326
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
327 328 329 330 331
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
332 333 334
	}

	select {
335 336
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
337 338
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
339 340
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
341 342
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
343
	}
344
}
345 346 347 348

type Batch struct {
	ds *dagService

349
	blocks  []blocks.Block
350 351 352 353
	size    int
	MaxSize int
}

Jeromy's avatar
Jeromy committed
354
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
355
	t.blocks = append(t.blocks, nd)
356
	t.size += len(nd.RawData())
357
	if t.size > t.MaxSize {
Jeromy's avatar
Jeromy committed
358
		return nd.Cid(), t.Commit()
359
	}
Jeromy's avatar
Jeromy committed
360
	return nd.Cid(), nil
361 362 363
}

func (t *Batch) Commit() error {
364 365
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
366 367 368
	t.size = 0
	return err
}
369 370 371 372

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
373 374 375 376 377 378 379
func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit func(*cid.Cid) bool, bestEffort bool) error {
	links, err := ds.GetLinks(ctx, root)
	if bestEffort && err == ErrNotFound {
		return nil
	} else if err != nil {
		return err
	}
380
	for _, lnk := range links {
381
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
382
		if visit(c) {
383
			err = EnumerateChildren(ctx, ds, c, visit, bestEffort)
384 385 386 387 388 389 390
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
391

392
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
Jeromy's avatar
Jeromy committed
393
	toprocess := make(chan []*cid.Cid, 8)
394
	nodes := make(chan *NodeOption, 8)
Jeromy's avatar
Jeromy committed
395 396 397 398 399

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

400
	go fetchNodes(ctx, ds, toprocess, nodes)
Jeromy's avatar
Jeromy committed
401

402 403 404 405 406
	root, err := ds.Get(ctx, c)
	if err != nil {
		return err
	}

407
	nodes <- &NodeOption{Node: root}
Jeromy's avatar
Jeromy committed
408 409 410 411
	live := 1

	for {
		select {
412
		case opt, ok := <-nodes:
Jeromy's avatar
Jeromy committed
413 414 415
			if !ok {
				return nil
			}
416 417 418 419 420 421 422

			if opt.Err != nil {
				return opt.Err
			}

			nd := opt.Node

Jeromy's avatar
Jeromy committed
423 424 425
			// a node has been fetched
			live--

Jeromy's avatar
Jeromy committed
426
			var cids []*cid.Cid
427 428
			for _, lnk := range nd.Links() {
				c := lnk.Cid
Jeromy's avatar
Jeromy committed
429
				if visit(c) {
Jeromy's avatar
Jeromy committed
430
					live++
Jeromy's avatar
Jeromy committed
431
					cids = append(cids, c)
Jeromy's avatar
Jeromy committed
432 433 434 435 436 437 438
				}
			}

			if live == 0 {
				return nil
			}

Jeromy's avatar
Jeromy committed
439
			if len(cids) > 0 {
Jeromy's avatar
Jeromy committed
440
				select {
Jeromy's avatar
Jeromy committed
441
				case toprocess <- cids:
Jeromy's avatar
Jeromy committed
442 443 444 445 446 447 448 449 450 451
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

452 453 454 455
// FetchGraphConcurrency is total number of concurrenct fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8

Jeromy's avatar
Jeromy committed
456
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out chan<- *NodeOption) {
457 458 459 460 461 462 463
	var wg sync.WaitGroup
	defer func() {
		// wait for all 'get' calls to complete so we don't accidentally send
		// on a closed channel
		wg.Wait()
		close(out)
	}()
Jeromy's avatar
Jeromy committed
464

465 466
	rateLimit := make(chan struct{}, FetchGraphConcurrency)

Jeromy's avatar
Jeromy committed
467
	get := func(ks []*cid.Cid) {
468
		defer wg.Done()
469 470 471
		defer func() {
			<-rateLimit
		}()
472 473
		nodes := ds.GetMany(ctx, ks)
		for opt := range nodes {
Jeromy's avatar
Jeromy committed
474
			select {
475 476
			case out <- opt:
			case <-ctx.Done():
477
				return
Jeromy's avatar
Jeromy committed
478 479 480 481 482
			}
		}
	}

	for ks := range in {
483 484 485 486 487
		select {
		case rateLimit <- struct{}{}:
		case <-ctx.Done():
			return
		}
488
		wg.Add(1)
489
		go get(ks)
Jeromy's avatar
Jeromy committed
490 491
	}
}