merkledag.go 10.1 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"strings"
8
	"sync"
Jeromy's avatar
Jeromy committed
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
12
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
13

Jeromy's avatar
Jeromy committed
14
	ipldcbor "gx/ipfs/QmRcAVqrbY5wryx7hfNLtiUZbCcstzaJL7YJFBboitcqWF/go-ipld-cbor"
Jeromy's avatar
Jeromy committed
15
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
16 17
	node "gx/ipfs/QmU7bFWQ793qmvNy7outdCaMfSDNk8uqhx4VNrxYj5fj5g/go-ipld-node"
	cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("merkledag")
21
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
22

Jeromy's avatar
Jeromy committed
23 24
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
25 26 27
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
28 29 30

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
31
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
32 33

	Batch() *Batch
34 35

	LinkService
Jeromy's avatar
Jeromy committed
36 37
}

38
type LinkService interface {
39 40
	// Return all links for a node, may be more effect than
	// calling Get in DAGService
Jeromy's avatar
Jeromy committed
41
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
42 43

	GetOfflineLinkService() LinkService
44 45
}

46
func NewDAGService(bs bserv.BlockService) *dagService {
47
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
48 49
}

50
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
51 52
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
53 54
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
55
type dagService struct {
56
	Blocks bserv.BlockService
57 58
}

59
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
60
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
61
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
62
		return nil, fmt.Errorf("dagService is nil")
63 64
	}

65
	return n.Blocks.AddBlock(nd)
66 67
}

68 69 70 71
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

72
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
73
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
74
	if n == nil {
75
		return nil, fmt.Errorf("dagService is nil")
76
	}
Jeromy's avatar
Jeromy committed
77

78 79
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
80

Jeromy's avatar
Jeromy committed
81
	b, err := n.Blocks.GetBlock(ctx, c)
82
	if err != nil {
83 84 85
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
86
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
87 88
	}

89 90 91 92 93 94
	return decodeBlock(b)
}

func decodeBlock(b blocks.Block) (node.Node, error) {
	c := b.Cid()

Jeromy's avatar
Jeromy committed
95 96
	switch c.Type() {
	case cid.Protobuf:
97
		decnd, err := DecodeProtobuf(b.RawData())
Jeromy's avatar
Jeromy committed
98 99 100 101 102
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
103
		}
104 105 106 107 108

		decnd.cached = b.Cid()
		return decnd, nil
	case cid.Raw:
		return NewRawNode(b.RawData()), nil
109 110
	case cid.CBOR:
		return ipldcbor.Decode(b.RawData())
Jeromy's avatar
Jeromy committed
111
	default:
112
		return nil, fmt.Errorf("unrecognized object type: %s", c.Type())
113
	}
114
}
Jeromy's avatar
Jeromy committed
115

Jeromy's avatar
Jeromy committed
116
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
117 118 119 120
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
121
	return node.Links(), nil
122 123
}

124
func (n *dagService) GetOfflineLinkService() LinkService {
125 126
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
127 128 129 130 131 132
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
133
func (n *dagService) Remove(nd node.Node) error {
134
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
135 136
}

137
// FetchGraph fetches all nodes that are children of the given node
138 139
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
	return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
Jeromy's avatar
Jeromy committed
140
}
141

Jeromy's avatar
Jeromy committed
142 143
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
144
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
145
	var out []int
Jeromy's avatar
Jeromy committed
146 147
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
148
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
149 150
		}
	}
Jeromy's avatar
Jeromy committed
151
	return out
Jeromy's avatar
Jeromy committed
152 153
}

154
type NodeOption struct {
Jeromy's avatar
Jeromy committed
155
	Node node.Node
156 157 158
	Err  error
}

Jeromy's avatar
Jeromy committed
159
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
160
	out := make(chan *NodeOption, len(keys))
161
	blocks := ds.Blocks.GetBlocks(ctx, keys)
162 163
	var count int

164 165 166 167 168 169
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
170
					if count != len(keys) {
171
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
172
					}
173 174
					return
				}
Jeromy's avatar
Jeromy committed
175

176 177 178
				nd, err := decodeBlock(b)
				if err != nil {
					out <- &NodeOption{Err: err}
179 180
					return
				}
Jeromy's avatar
Jeromy committed
181

182
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
183 184
				count++

185
			case <-ctx.Done():
186
				out <- &NodeOption{Err: ctx.Err()}
187 188 189 190
				return
			}
		}
	}()
191
	return out
192 193
}

194
// GetDAG will fill out all of the links of the given Node.
195 196
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
197
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
198
	var cids []*cid.Cid
199 200
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
201 202
	}

Jeromy's avatar
Jeromy committed
203
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
204 205
}

Jeromy's avatar
Jeromy committed
206 207
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
208
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
209 210 211 212 213 214

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
215
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
216
	for i := range keys {
217
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
218 219
	}

220
	dedupedKeys := dedupeKeys(keys)
221
	go func() {
222 223 224
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

225
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
226

227
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
228
			select {
229
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
230
				if !ok {
Jeromy's avatar
Jeromy committed
231 232 233
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
234 235
					return
				}
Jeromy's avatar
Jeromy committed
236

237
				if opt.Err != nil {
238 239 240
					for _, p := range promises {
						p.Fail(opt.Err)
					}
241 242 243 244
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
245
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
246
				for _, i := range is {
247
					count++
248
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
249 250 251
				}
			case <-ctx.Done():
				return
252 253 254
			}
		}
	}()
Jeromy's avatar
Jeromy committed
255 256 257
	return promises
}

258
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
259 260 261 262
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
263
	}
Jeromy's avatar
Jeromy committed
264
	return set.Keys()
265 266
}

267
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
268
	return &nodePromise{
Jeromy's avatar
Jeromy committed
269
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
270
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
271
		err:  make(chan error, 1),
272
	}
Jeromy's avatar
Jeromy committed
273 274 275
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
276
	cache node.Node
277
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
278
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
279
	ctx   context.Context
Jeromy's avatar
Jeromy committed
280
	err   chan error
Jeromy's avatar
Jeromy committed
281 282
}

Jeromy's avatar
Jeromy committed
283 284 285 286
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
287
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
288
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
289
	Fail(err error)
Jeromy's avatar
Jeromy committed
290
	Send(node.Node)
Jeromy's avatar
Jeromy committed
291 292 293
}

func (np *nodePromise) Fail(err error) {
294 295 296 297 298 299 300 301 302
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
303
	np.err <- err
Jeromy's avatar
Jeromy committed
304 305
}

Jeromy's avatar
Jeromy committed
306
func (np *nodePromise) Send(nd node.Node) {
307 308
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
309
	if np.cache != nil {
310 311 312 313 314 315 316 317 318 319 320 321
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
322
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
323 324 325 326 327
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
328 329 330
	}

	select {
331 332
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
333 334
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
335 336
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
337 338
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
339
	}
340
}
341 342 343 344

type Batch struct {
	ds *dagService

345
	blocks  []blocks.Block
346 347 348 349
	size    int
	MaxSize int
}

Jeromy's avatar
Jeromy committed
350
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
351
	t.blocks = append(t.blocks, nd)
352
	t.size += len(nd.RawData())
353
	if t.size > t.MaxSize {
Jeromy's avatar
Jeromy committed
354
		return nd.Cid(), t.Commit()
355
	}
Jeromy's avatar
Jeromy committed
356
	return nd.Cid(), nil
357 358 359
}

func (t *Batch) Commit() error {
360 361
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
362 363 364
	t.size = 0
	return err
}
365 366 367 368

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
369 370 371 372 373 374 375
func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit func(*cid.Cid) bool, bestEffort bool) error {
	links, err := ds.GetLinks(ctx, root)
	if bestEffort && err == ErrNotFound {
		return nil
	} else if err != nil {
		return err
	}
376
	for _, lnk := range links {
377
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
378
		if visit(c) {
379
			err = EnumerateChildren(ctx, ds, c, visit, bestEffort)
380 381 382 383 384 385 386
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
387

388
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
Jeromy's avatar
Jeromy committed
389
	toprocess := make(chan []*cid.Cid, 8)
390
	nodes := make(chan *NodeOption, 8)
Jeromy's avatar
Jeromy committed
391 392 393 394 395

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

396
	go fetchNodes(ctx, ds, toprocess, nodes)
Jeromy's avatar
Jeromy committed
397

398 399 400 401 402
	root, err := ds.Get(ctx, c)
	if err != nil {
		return err
	}

403
	nodes <- &NodeOption{Node: root}
Jeromy's avatar
Jeromy committed
404 405 406 407
	live := 1

	for {
		select {
408
		case opt, ok := <-nodes:
Jeromy's avatar
Jeromy committed
409 410 411
			if !ok {
				return nil
			}
412 413 414 415 416 417 418

			if opt.Err != nil {
				return opt.Err
			}

			nd := opt.Node

Jeromy's avatar
Jeromy committed
419 420 421
			// a node has been fetched
			live--

Jeromy's avatar
Jeromy committed
422
			var cids []*cid.Cid
423 424
			for _, lnk := range nd.Links() {
				c := lnk.Cid
Jeromy's avatar
Jeromy committed
425
				if visit(c) {
Jeromy's avatar
Jeromy committed
426
					live++
Jeromy's avatar
Jeromy committed
427
					cids = append(cids, c)
Jeromy's avatar
Jeromy committed
428 429 430 431 432 433 434
				}
			}

			if live == 0 {
				return nil
			}

Jeromy's avatar
Jeromy committed
435
			if len(cids) > 0 {
Jeromy's avatar
Jeromy committed
436
				select {
Jeromy's avatar
Jeromy committed
437
				case toprocess <- cids:
Jeromy's avatar
Jeromy committed
438 439 440 441 442 443 444 445 446 447
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

Jeromy's avatar
Jeromy committed
448
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out chan<- *NodeOption) {
449 450 451 452 453 454 455
	var wg sync.WaitGroup
	defer func() {
		// wait for all 'get' calls to complete so we don't accidentally send
		// on a closed channel
		wg.Wait()
		close(out)
	}()
Jeromy's avatar
Jeromy committed
456

Jeromy's avatar
Jeromy committed
457
	get := func(ks []*cid.Cid) {
458 459 460
		defer wg.Done()
		nodes := ds.GetMany(ctx, ks)
		for opt := range nodes {
Jeromy's avatar
Jeromy committed
461
			select {
462 463
			case out <- opt:
			case <-ctx.Done():
464
				return
Jeromy's avatar
Jeromy committed
465 466 467 468 469
			}
		}
	}

	for ks := range in {
470
		wg.Add(1)
471
		go get(ks)
Jeromy's avatar
Jeromy committed
472 473
	}
}