merkledag.go 10 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
6
	"strings"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

9
	bserv "github.com/ipfs/go-ipfs/blockservice"
10
	key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
Jeromy's avatar
Jeromy committed
11

12
	"context"
Jeromy's avatar
Jeromy committed
13
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
14
	cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

Jeromy's avatar
Jeromy committed
17
var log = logging.Logger("merkledag")
18
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
19

Jeromy's avatar
Jeromy committed
20 21
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Jeromy's avatar
Jeromy committed
22 23
	Add(*Node) (*cid.Cid, error)
	Get(context.Context, *cid.Cid) (*Node, error)
Jeromy's avatar
Jeromy committed
24
	Remove(*Node) error
25 26 27

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
28
	GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
29 30

	Batch() *Batch
Jeromy's avatar
Jeromy committed
31 32 33 34 35 36
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

37
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
38 39
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
40 41
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
42
type dagService struct {
43
	Blocks *bserv.BlockService
44 45
}

46
// Add adds a node to the dagService, storing the block in the BlockService
Jeromy's avatar
Jeromy committed
47
func (n *dagService) Add(nd *Node) (*cid.Cid, error) {
48
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
Jeromy's avatar
Jeromy committed
49
		return nil, fmt.Errorf("dagService is nil")
50 51
	}

Jeromy's avatar
Jeromy committed
52
	return n.Blocks.AddObject(nd)
53 54
}

55 56 57 58
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

59
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
60
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) {
61
	if n == nil {
62
		return nil, fmt.Errorf("dagService is nil")
63
	}
Jeromy's avatar
Jeromy committed
64

65 66
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
67

Jeromy's avatar
Jeromy committed
68
	b, err := n.Blocks.GetBlock(ctx, c)
69
	if err != nil {
70 71 72
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
Jeromy's avatar
Jeromy committed
73
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
74 75
	}

Jeromy's avatar
Jeromy committed
76 77 78 79 80 81 82 83 84
	var res *Node
	switch c.Type() {
	case cid.Protobuf:
		out, err := DecodeProtobuf(b.RawData())
		if err != nil {
			if strings.Contains(err.Error(), "Unmarshal failed") {
				return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", c)
			}
			return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
85
		}
Jeromy's avatar
Jeromy committed
86 87 88
		res = out
	default:
		return nil, fmt.Errorf("unrecognized formatting type")
89
	}
90

Jeromy's avatar
Jeromy committed
91
	res.cached = c
92

93
	return res, nil
94
}
Jeromy's avatar
Jeromy committed
95

Jeromy's avatar
Jeromy committed
96
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
97
	return n.Blocks.DeleteObject(nd)
Jeromy's avatar
Jeromy committed
98 99
}

100 101
// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
Jeromy's avatar
Jeromy committed
102
	return EnumerateChildrenAsync(ctx, serv, root, cid.NewSet().Visit)
Jeromy's avatar
Jeromy committed
103
}
104

Jeromy's avatar
Jeromy committed
105 106
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
107
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
108
	var out []int
Jeromy's avatar
Jeromy committed
109 110
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
111
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
112 113
		}
	}
Jeromy's avatar
Jeromy committed
114
	return out
Jeromy's avatar
Jeromy committed
115 116
}

117 118 119 120 121
type NodeOption struct {
	Node *Node
	Err  error
}

Jeromy's avatar
Jeromy committed
122 123 124 125 126 127
// TODO: this is a mid-term hack to get around the fact that blocks don't
// have full CIDs and potentially (though we don't know of any such scenario)
// may have the same block with multiple different encodings.
// We have discussed the possiblity of using CIDs as datastore keys
// in the future. This would be a much larger changeset than i want to make
// right now.
Jeromy's avatar
Jeromy committed
128 129 130 131 132 133 134 135 136
func cidsToKeyMapping(cids []*cid.Cid) map[key.Key]*cid.Cid {
	mapping := make(map[key.Key]*cid.Cid)
	for _, c := range cids {
		mapping[key.Key(c.Hash())] = c
	}
	return mapping
}

func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
137
	out := make(chan *NodeOption, len(keys))
138
	blocks := ds.Blocks.GetBlocks(ctx, keys)
139 140
	var count int

Jeromy's avatar
Jeromy committed
141 142
	mapping := cidsToKeyMapping(keys)

143 144 145 146 147 148
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
149
					if count != len(keys) {
150
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
151
					}
152 153
					return
				}
Jeromy's avatar
Jeromy committed
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168

				c := mapping[b.Key()]

				var nd *Node
				switch c.Type() {
				case cid.Protobuf:
					decnd, err := DecodeProtobuf(b.RawData())
					if err != nil {
						out <- &NodeOption{Err: err}
						return
					}
					decnd.cached = cid.NewCidV0(b.Multihash())
					nd = decnd
				default:
					out <- &NodeOption{Err: fmt.Errorf("unrecognized object type: %s", c.Type())}
169 170
					return
				}
Jeromy's avatar
Jeromy committed
171 172

				// buffered, no need to select
173
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
174 175
				count++

176
			case <-ctx.Done():
177
				out <- &NodeOption{Err: ctx.Err()}
178 179 180 181
				return
			}
		}
	}()
182
	return out
183 184
}

185
// GetDAG will fill out all of the links of the given Node.
186 187
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
188
func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
189
	var cids []*cid.Cid
Jeromy's avatar
Jeromy committed
190
	for _, lnk := range root.Links {
Jeromy's avatar
Jeromy committed
191
		cids = append(cids, cid.NewCidV0(lnk.Hash))
Jeromy's avatar
Jeromy committed
192 193
	}

Jeromy's avatar
Jeromy committed
194
	return GetNodes(ctx, ds, cids)
Jeromy's avatar
Jeromy committed
195 196
}

Jeromy's avatar
Jeromy committed
197 198
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
199
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
200 201 202 203 204 205

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
206
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
207
	for i := range keys {
208
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
209 210
	}

211
	dedupedKeys := dedupeKeys(keys)
212
	go func() {
213 214 215
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

216
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
217

218
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
219
			select {
220
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
221
				if !ok {
Jeromy's avatar
Jeromy committed
222 223 224
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
225 226
					return
				}
Jeromy's avatar
Jeromy committed
227

228
				if opt.Err != nil {
229 230 231
					for _, p := range promises {
						p.Fail(opt.Err)
					}
232 233 234 235
					return
				}

				nd := opt.Node
Jeromy's avatar
Jeromy committed
236
				is := FindLinks(keys, nd.Cid(), 0)
Jeromy's avatar
Jeromy committed
237
				for _, i := range is {
238
					count++
239
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
240 241 242
				}
			case <-ctx.Done():
				return
243 244 245
			}
		}
	}()
Jeromy's avatar
Jeromy committed
246 247 248
	return promises
}

249
// Remove duplicates from a list of keys
Jeromy's avatar
Jeromy committed
250 251 252 253
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range cids {
		set.Add(c)
254
	}
Jeromy's avatar
Jeromy committed
255
	return set.Keys()
256 257
}

258
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
259
	return &nodePromise{
260
		recv: make(chan *Node, 1),
Jeromy's avatar
Jeromy committed
261
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
262
		err:  make(chan error, 1),
263
	}
Jeromy's avatar
Jeromy committed
264 265 266 267
}

type nodePromise struct {
	cache *Node
268 269
	clk   sync.Mutex
	recv  chan *Node
Jeromy's avatar
Jeromy committed
270
	ctx   context.Context
Jeromy's avatar
Jeromy committed
271
	err   chan error
Jeromy's avatar
Jeromy committed
272 273
}

Jeromy's avatar
Jeromy committed
274 275 276 277
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
278
type NodeGetter interface {
279
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
280
	Fail(err error)
281
	Send(*Node)
Jeromy's avatar
Jeromy committed
282 283 284
}

func (np *nodePromise) Fail(err error) {
285 286 287 288 289 290 291 292 293
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
294
	np.err <- err
Jeromy's avatar
Jeromy committed
295 296
}

297 298 299
func (np *nodePromise) Send(nd *Node) {
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
300
	if np.cache != nil {
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
319 320 321
	}

	select {
322 323
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
324 325
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
326 327
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
328 329
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
330
	}
331
}
332 333 334 335

type Batch struct {
	ds *dagService

Jeromy's avatar
Jeromy committed
336
	objects []bserv.Object
337 338 339 340
	size    int
	MaxSize int
}

Jeromy's avatar
Jeromy committed
341
func (t *Batch) Add(nd *Node) (*cid.Cid, error) {
342
	d, err := nd.EncodeProtobuf(false)
343
	if err != nil {
Jeromy's avatar
Jeromy committed
344
		return nil, err
345 346
	}

Jeromy's avatar
Jeromy committed
347 348
	t.objects = append(t.objects, nd)
	t.size += len(d)
349
	if t.size > t.MaxSize {
Jeromy's avatar
Jeromy committed
350
		return nd.Cid(), t.Commit()
351
	}
Jeromy's avatar
Jeromy committed
352
	return nd.Cid(), nil
353 354 355
}

func (t *Batch) Commit() error {
Jeromy's avatar
Jeromy committed
356 357
	_, err := t.ds.Blocks.AddObjects(t.objects)
	t.objects = nil
358 359 360
	t.size = 0
	return err
}
361

Jeromy's avatar
Jeromy committed
362 363 364 365
func legacyCidFromLink(lnk *Link) *cid.Cid {
	return cid.NewCidV0(lnk.Hash)
}

366 367 368
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
Jeromy's avatar
Jeromy committed
369
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, visit func(*cid.Cid) bool, bestEffort bool) error {
370
	for _, lnk := range root.Links {
Jeromy's avatar
Jeromy committed
371 372 373
		c := legacyCidFromLink(lnk)
		if visit(c) {
			child, err := ds.Get(ctx, c)
374
			if err != nil {
375 376 377 378 379
				if bestEffort && err == ErrNotFound {
					continue
				} else {
					return err
				}
380
			}
Jeromy's avatar
Jeromy committed
381
			err = EnumerateChildren(ctx, ds, child, visit, bestEffort)
382 383 384 385 386 387 388
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
389

Jeromy's avatar
Jeromy committed
390 391
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, visit func(*cid.Cid) bool) error {
	toprocess := make(chan []*cid.Cid, 8)
392
	nodes := make(chan *NodeOption, 8)
Jeromy's avatar
Jeromy committed
393 394 395 396 397

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

398
	go fetchNodes(ctx, ds, toprocess, nodes)
Jeromy's avatar
Jeromy committed
399

400
	nodes <- &NodeOption{Node: root}
Jeromy's avatar
Jeromy committed
401 402 403 404
	live := 1

	for {
		select {
405
		case opt, ok := <-nodes:
Jeromy's avatar
Jeromy committed
406 407 408
			if !ok {
				return nil
			}
409 410 411 412 413 414 415

			if opt.Err != nil {
				return opt.Err
			}

			nd := opt.Node

Jeromy's avatar
Jeromy committed
416 417 418
			// a node has been fetched
			live--

Jeromy's avatar
Jeromy committed
419
			var cids []*cid.Cid
Jeromy's avatar
Jeromy committed
420
			for _, lnk := range nd.Links {
Jeromy's avatar
Jeromy committed
421 422
				c := legacyCidFromLink(lnk)
				if visit(c) {
Jeromy's avatar
Jeromy committed
423
					live++
Jeromy's avatar
Jeromy committed
424
					cids = append(cids, c)
Jeromy's avatar
Jeromy committed
425 426 427 428 429 430 431
				}
			}

			if live == 0 {
				return nil
			}

Jeromy's avatar
Jeromy committed
432
			if len(cids) > 0 {
Jeromy's avatar
Jeromy committed
433
				select {
Jeromy's avatar
Jeromy committed
434
				case toprocess <- cids:
Jeromy's avatar
Jeromy committed
435 436 437 438 439 440 441 442 443 444
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

Jeromy's avatar
Jeromy committed
445
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []*cid.Cid, out chan<- *NodeOption) {
446 447 448 449 450 451 452
	var wg sync.WaitGroup
	defer func() {
		// wait for all 'get' calls to complete so we don't accidentally send
		// on a closed channel
		wg.Wait()
		close(out)
	}()
Jeromy's avatar
Jeromy committed
453

Jeromy's avatar
Jeromy committed
454
	get := func(ks []*cid.Cid) {
455 456 457
		defer wg.Done()
		nodes := ds.GetMany(ctx, ks)
		for opt := range nodes {
Jeromy's avatar
Jeromy committed
458
			select {
459 460
			case out <- opt:
			case <-ctx.Done():
461
				return
Jeromy's avatar
Jeromy committed
462 463 464 465 466
			}
		}
	}

	for ks := range in {
467
		wg.Add(1)
468
		go get(ks)
Jeromy's avatar
Jeromy committed
469 470
	}
}