merkledag.go 10.1 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"strings"
8
	"sync"
Jeromy's avatar
Jeromy committed
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
12
	offline "github.com/ipfs/go-ipfs/exchange/offline"
Jeromy's avatar
Jeromy committed
13

Jeromy's avatar
Jeromy committed
14
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
15
	cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
Jeromy's avatar
Jeromy committed
16
	node "gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18
)

Jeromy's avatar
Jeromy committed
19
var log = logging.Logger("merkledag")
20
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
21

Jeromy's avatar
Jeromy committed
22 23
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
24 25 26
	Add(node.Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (node.Node, error)
	Remove(node.Node) error
27 28 29

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
30
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
31 32

	Batch() *Batch
33 34

	LinkService
Jeromy's avatar
Jeromy committed
35 36
}

37
type LinkService interface {
38 39
	// Return all links for a node, may be more effect than
	// calling Get in DAGService
Jeromy's avatar
Jeromy committed
40
	GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
41 42

	GetOfflineLinkService() LinkService
43 44
}

45
func NewDAGService(bs bserv.BlockService) *dagService {
46
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
47 48
}

49
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
50 51
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
52 53
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
54
type dagService struct {
55
	Blocks bserv.BlockService
56 57
}

58
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
59
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
60
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
61
		return nil, fmt.Errorf("dagService is nil")
62 63
	}

64
	return n.Blocks.AddBlock(nd)
65 66
}

67 68 69 70
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

71
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
72
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
73
	if n == nil {
74
		return nil, fmt.Errorf("dagService is nil")
75
	}
Jeromy's avatar
Jeromy committed
76

77 78
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
79

Jeromy's avatar
Jeromy committed
80
	b, err := n.Blocks.GetBlock(ctx, c)
81
	if err != nil {
82 83 84
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
85
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
86 87
	}

Jeromy's avatar
Jeromy committed
88
	var res node.Node
Jeromy's avatar
Jeromy committed
89 90 91 92 93 94 95 96
	switch c.Type() {
	case cid.Protobuf:
		out, err := DecodeProtobuf(b.RawData())
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
97
		}
98
		out.cached = c
Jeromy's avatar
Jeromy committed
99 100 101
		res = out
	default:
		return nil, fmt.Errorf("unrecognized formatting type")
102
	}
103

104
	return res, nil
105
}
Jeromy's avatar
Jeromy committed
106

Jeromy's avatar
Jeromy committed
107
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
108 109 110 111
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
112
	return node.Links(), nil
113 114
}

115
func (n *dagService) GetOfflineLinkService() LinkService {
116 117
	if n.Blocks.Exchange().IsOnline() {
		bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
118 119 120 121 122 123
		return NewDAGService(bsrv)
	} else {
		return n
	}
}

Jeromy's avatar
Jeromy committed
124
func (n *dagService) Remove(nd node.Node) error {
125
	return n.Blocks.DeleteBlock(nd)
Jeromy's avatar
Jeromy committed
126 127
}

128
// FetchGraph fetches all nodes that are children of the given node
129 130
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
	return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
Jeromy's avatar
Jeromy committed
131
}
132

Jeromy's avatar
Jeromy committed
133 134
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
135
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
136
	var out []int
Jeromy's avatar
Jeromy committed
137 138
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
139
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
140 141
		}
	}
Jeromy's avatar
Jeromy committed
142
	return out
Jeromy's avatar
Jeromy committed
143 144
}

145
type NodeOption struct {
Jeromy's avatar
Jeromy committed
146
	Node node.Node
147 148 149
	Err  error
}

Jeromy's avatar
Jeromy committed
150
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
151
	out := make(chan *NodeOption, len(keys))
152
	blocks := ds.Blocks.GetBlocks(ctx, keys)
153 154
	var count int

155 156 157 158 159 160
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
161
					if count != len(keys) {
162
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
163
					}
164 165
					return
				}
Jeromy's avatar
Jeromy committed
166

167
				c := b.Cid()
Jeromy's avatar
Jeromy committed
168

Jeromy's avatar
Jeromy committed
169
				var nd node.Node
Jeromy's avatar
Jeromy committed
170 171 172 173 174 175 176
				switch c.Type() {
				case cid.Protobuf:
					decnd, err := DecodeProtobuf(b.RawData())
					if err != nil {
						out <- &NodeOption{Err: err}
						return
					}
177
					decnd.cached = b.Cid()
Jeromy's avatar
Jeromy committed
178 179 180
					nd = decnd
				default:
					out <- &NodeOption{Err: fmt.Errorf("unrecognized object type: %s", c.Type())}
181 182
					return
				}
Jeromy's avatar
Jeromy committed
183 184

				// buffered, no need to select
185
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
186 187
				count++

188
			case <-ctx.Done():
189
				out <- &NodeOption{Err: ctx.Err()}
190 191 192 193
				return
			}
		}
	}()
194
	return out
195 196
}

197
// GetDAG will fill out all of the links of the given Node.
198 199
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
200
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
201
	var cids []*cid.Cid
202 203
	for _, lnk := range root.Links() {
		cids = append(cids, lnk.Cid)
Jeromy's avatar
Jeromy committed
204 205
	}

Jeromy's avatar
Jeromy committed
206
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
207 208
}

Jeromy's avatar
Jeromy committed
209 210
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
211
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
212 213 214 215 216 217

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
218
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
219
	for i := range keys {
220
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
221 222
	}

223
	dedupedKeys := dedupeKeys(keys)
224
	go func() {
225 226 227
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

228
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
229

230
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
231
			select {
232
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
233
				if !ok {
Jeromy's avatar
Jeromy committed
234 235 236
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
237 238
					return
				}
Jeromy's avatar
Jeromy committed
239

240
				if opt.Err != nil {
241 242 243
					for _, p := range promises {
						p.Fail(opt.Err)
					}
244 245 246 247
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
248
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
249
				for _, i := range is {
250
					count++
251
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
252 253 254
				}
			case <-ctx.Done():
				return
255 256 257
			}
		}
	}()
Jeromy's avatar
Jeromy committed
258 259 260
	return promises
}

261
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
262 263 264 265
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
266
	}
Jeromy's avatar
Jeromy committed
267
	return set.Keys()
268 269
}

270
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
271
	return &nodePromise{
Jeromy's avatar
Jeromy committed
272
		recv: make(chan node.Node, 1),
Jeromy's avatar
Jeromy committed
273
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
274
		err:  make(chan error, 1),
275
	}
Jeromy's avatar
Jeromy committed
276 277 278
}

type nodePromise struct {
Jeromy's avatar
Jeromy committed
279
	cache node.Node
280
	clk   sync.Mutex
Jeromy's avatar
Jeromy committed
281
	recv  chan node.Node
Jeromy's avatar
Jeromy committed
282
	ctx   context.Context
Jeromy's avatar
Jeromy committed
283
	err   chan error
Jeromy's avatar
Jeromy committed
284 285
}

Jeromy's avatar
Jeromy committed
286 287 288 289
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
290
type NodeGetter interface {
Jeromy's avatar
Jeromy committed
291
	Get(context.Context) (node.Node, error)
Jeromy's avatar
Jeromy committed
292
	Fail(err error)
Jeromy's avatar
Jeromy committed
293
	Send(node.Node)
Jeromy's avatar
Jeromy committed
294 295 296
}

func (np *nodePromise) Fail(err error) {
297 298 299 300 301 302 303 304 305
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
306
	np.err <- err
Jeromy's avatar
Jeromy committed
307 308
}

Jeromy's avatar
Jeromy committed
309
func (np *nodePromise) Send(nd node.Node) {
310 311
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
312
	if np.cache != nil {
313 314 315 316 317 318 319 320 321 322 323 324
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

Jeromy's avatar
Jeromy committed
325
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
326 327 328 329 330
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
331 332 333
	}

	select {
334 335
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
336 337
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
338 339
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
340 341
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
342
	}
343
}
344 345 346 347

type Batch struct {
	ds *dagService

348
	blocks  []blocks.Block
349 350 351 352
	size    int
	MaxSize int
}

Jeromy's avatar
Jeromy committed
353
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
354
	t.blocks = append(t.blocks, nd)
355
	t.size += len(nd.RawData())
356
	if t.size > t.MaxSize {
Jeromy's avatar
Jeromy committed
357
		return nd.Cid(), t.Commit()
358
	}
Jeromy's avatar
Jeromy committed
359
	return nd.Cid(), nil
360 361 362
}

func (t *Batch) Commit() error {
363 364
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
365 366 367
	t.size = 0
	return err
}
368 369 370 371

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
372 373 374 375 376 377 378
func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit func(*cid.Cid) bool, bestEffort bool) error {
	links, err := ds.GetLinks(ctx, root)
	if bestEffort && err == ErrNotFound {
		return nil
	} else if err != nil {
		return err
	}
379
	for _, lnk := range links {
380
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
381
		if visit(c) {
382
			err = EnumerateChildren(ctx, ds, c, visit, bestEffort)
383 384 385 386 387 388 389
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
390

391
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
Jeromy's avatar
Jeromy committed
392
	toprocess := make(chan []*cid.Cid, 8)
393
	nodes := make(chan *NodeOption, 8)
Jeromy's avatar
Jeromy committed
394 395 396 397 398

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

399
	go fetchNodes(ctx, ds, toprocess, nodes)
Jeromy's avatar
Jeromy committed
400

401 402 403 404 405
	root, err := ds.Get(ctx, c)
	if err != nil {
		return err
	}

406
	nodes <- &NodeOption{Node: root}
Jeromy's avatar
Jeromy committed
407 408 409 410
	live := 1

	for {
		select {
411
		case opt, ok := <-nodes:
Jeromy's avatar
Jeromy committed
412 413 414
			if !ok {
				return nil
			}
415 416 417 418 419 420 421

			if opt.Err != nil {
				return opt.Err
			}

			nd := opt.Node

Jeromy's avatar
Jeromy committed
422 423 424
			// a node has been fetched
			live--

Jeromy's avatar
Jeromy committed
425
			var cids []*cid.Cid
426 427
			for _, lnk := range nd.Links() {
				c := lnk.Cid
Jeromy's avatar
Jeromy committed
428
				if visit(c) {
Jeromy's avatar
Jeromy committed
429
					live++
Jeromy's avatar
Jeromy committed
430
					cids = append(cids, c)
Jeromy's avatar
Jeromy committed
431 432 433 434 435 436 437
				}
			}

			if live == 0 {
				return nil
			}

Jeromy's avatar
Jeromy committed
438
			if len(cids) > 0 {
Jeromy's avatar
Jeromy committed
439
				select {
Jeromy's avatar
Jeromy committed
440
				case toprocess <- cids:
Jeromy's avatar
Jeromy committed
441 442 443 444 445 446 447 448 449 450
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

Jeromy's avatar
Jeromy committed
451
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out chan<- *NodeOption) {
452 453 454 455 456 457 458
	var wg sync.WaitGroup
	defer func() {
		// wait for all 'get' calls to complete so we don't accidentally send
		// on a closed channel
		wg.Wait()
		close(out)
	}()
Jeromy's avatar
Jeromy committed
459

Jeromy's avatar
Jeromy committed
460
	get := func(ks []*cid.Cid) {
461 462 463
		defer wg.Done()
		nodes := ds.GetMany(ctx, ks)
		for opt := range nodes {
Jeromy's avatar
Jeromy committed
464
			select {
465 466
			case out <- opt:
			case <-ctx.Done():
467
				return
Jeromy's avatar
Jeromy committed
468 469 470 471 472
			}
		}
	}

	for ks := range in {
473
		wg.Add(1)
474
		go get(ks)
Jeromy's avatar
Jeromy committed
475 476
	}
}