merkledag.go 10.3 KB
Newer Older
1
// Package merkledag implements the IPFS Merkle DAG data structures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

9
	bserv "github.com/ipfs/go-ipfs/blockservice"
Jeromy's avatar
Jeromy committed
10

Steven Allen's avatar
Steven Allen committed
11 12
	ipldcbor "gx/ipfs/QmNRz7BDWfdFNVLt7AVvmRefkrURD25EeoipcXqo6yoXU1/go-ipld-cbor"
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
13
	ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
14
	blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

17 18 19 20
// TODO: We should move these registrations elsewhere. Really, most of the IPLD
// functionality should go in a `go-ipld` repo but that will take a lot of work
// and design.
func init() {
21 22 23
	ipld.Register(cid.DagProtobuf, DecodeProtobufBlock)
	ipld.Register(cid.Raw, DecodeRawBlock)
	ipld.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
24 25
}

26 27 28 29 30
// contextKey is a type to use as value for the ProgressTracker contexts.
type contextKey string

const progressContextKey contextKey = "progress"

31
// NewDAGService constructs a new DAGService (using the default implementation).
32
// Note that the default implementation is also an ipld.LinkGetter.
33
func NewDAGService(bs bserv.BlockService) *dagService {
34
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
35 36
}

37
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
38 39
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
40 41
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
42
type dagService struct {
43
	Blocks bserv.BlockService
44 45
}

46
// Add adds a node to the dagService, storing the block in the BlockService
47
func (n *dagService) Add(ctx context.Context, nd ipld.Node) error {
48
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
49
		return fmt.Errorf("dagService is nil")
50 51
	}

52
	return n.Blocks.AddBlock(nd)
53 54
}

55
func (n *dagService) AddMany(ctx context.Context, nds []ipld.Node) error {
56 57 58
	blks := make([]blocks.Block, len(nds))
	for i, nd := range nds {
		blks[i] = nd
59
	}
60
	return n.Blocks.AddBlocks(blks)
61 62
}

63
// Get retrieves a node from the dagService, fetching the block in the BlockService
64
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (ipld.Node, error) {
65
	if n == nil {
66
		return nil, fmt.Errorf("dagService is nil")
67
	}
Jeromy's avatar
Jeromy committed
68

69 70
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
71

Jeromy's avatar
Jeromy committed
72
	b, err := n.Blocks.GetBlock(ctx, c)
73
	if err != nil {
74
		if err == bserv.ErrNotFound {
75
			return nil, ipld.ErrNotFound
76
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
77
		return nil, fmt.Errorf("failed to get block for %s: %v", c, err)
78 79
	}

80
	return ipld.Decode(b)
81
}
Jeromy's avatar
Jeromy committed
82

83 84
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
85
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*ipld.Link, error) {
86 87 88
	if c.Type() == cid.Raw {
		return nil, nil
	}
89 90 91 92
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
93
	return node.Links(), nil
94 95
}

96 97
func (n *dagService) Remove(ctx context.Context, c *cid.Cid) error {
	return n.Blocks.DeleteBlock(c)
98 99
}

100 101 102 103 104 105 106 107 108 109 110 111 112
// RemoveMany removes multiple nodes from the DAG. It will likely be faster than
// removing them individually.
//
// This operation is not atomic. If it returns an error, some nodes may or may
// not have been removed.
func (n *dagService) RemoveMany(ctx context.Context, cids []*cid.Cid) error {
	// TODO(#4608): make this batch all the way down.
	for _, c := range cids {
		if err := n.Blocks.DeleteBlock(c); err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
113 114
}

115 116 117
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
118 119
func GetLinksDirect(serv ipld.NodeGetter) GetLinks {
	return func(ctx context.Context, c *cid.Cid) ([]*ipld.Link, error) {
120
		nd, err := serv.Get(ctx, c)
121
		if err != nil {
Jeromy's avatar
Jeromy committed
122
			if err == bserv.ErrNotFound {
123
				err = ipld.ErrNotFound
Jeromy's avatar
Jeromy committed
124
			}
125 126
			return nil, err
		}
127
		return nd.Links(), nil
128 129 130
	}
}

131 132 133 134
type sesGetter struct {
	bs *bserv.Session
}

135
// Get gets a single node from the DAG.
136
func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (ipld.Node, error) {
137
	blk, err := sg.bs.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
138 139
	switch err {
	case bserv.ErrNotFound:
140
		return nil, ipld.ErrNotFound
Jeromy's avatar
Jeromy committed
141
	default:
142
		return nil, err
Jeromy's avatar
Jeromy committed
143 144
	case nil:
		// noop
145 146
	}

147
	return ipld.Decode(blk)
148 149
}

150
// GetMany gets many nodes at once, batching the request if possible.
151
func (sg *sesGetter) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *ipld.NodeOption {
152 153 154
	return getNodesFromBG(ctx, sg.bs, keys)
}

Jeromy's avatar
Jeromy committed
155
// Session returns a NodeGetter using a new session for block fetches.
156 157
func (n *dagService) Session(ctx context.Context) ipld.NodeGetter {
	return &sesGetter{bserv.NewSession(ctx, n.Blocks)}
Jeromy's avatar
Jeromy committed
158 159
}

160
// FetchGraph fetches all nodes that are children of the given node
161 162
func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error {
	var ng ipld.NodeGetter = serv
163 164
	ds, ok := serv.(*dagService)
	if ok {
165
		ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
166 167
	}

168
	v, _ := ctx.Value(progressContextKey).(*ProgressTracker)
169
	if v == nil {
170
		return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
171 172 173 174 175 176 177
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		}
178
		return false
179
	}
180
	return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
Jeromy's avatar
Jeromy committed
181
}
182

Jeromy's avatar
Jeromy committed
183 184
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
185
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
186
	var out []int
187 188
	for i, lnkC := range links[start:] {
		if c.Equals(lnkC) {
Jeromy's avatar
Jeromy committed
189
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
190 191
		}
	}
Jeromy's avatar
Jeromy committed
192
	return out
Jeromy's avatar
Jeromy committed
193 194
}

195 196 197 198 199
// GetMany gets many nodes from the DAG at once.
//
// This method may not return all requested nodes (and may or may not return an
// error indicating that it failed to do so. It is up to the caller to verify
// that it received all nodes.
200
func (n *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *ipld.NodeOption {
201
	return getNodesFromBG(ctx, n.Blocks, keys)
202 203
}

Steven Allen's avatar
Steven Allen committed
204 205 206 207 208 209 210 211 212 213 214
func dedupKeys(keys []*cid.Cid) []*cid.Cid {
	set := cid.NewSet()
	for _, c := range keys {
		set.Add(c)
	}
	if set.Len() == len(keys) {
		return keys
	}
	return set.Keys()
}

215
func getNodesFromBG(ctx context.Context, bs bserv.BlockGetter, keys []*cid.Cid) <-chan *ipld.NodeOption {
Steven Allen's avatar
Steven Allen committed
216 217
	keys = dedupKeys(keys)

218
	out := make(chan *ipld.NodeOption, len(keys))
219
	blocks := bs.GetBlocks(ctx, keys)
220 221
	var count int

222 223 224 225 226 227
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
228
					if count != len(keys) {
229
						out <- &ipld.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
230
					}
231 232
					return
				}
Jeromy's avatar
Jeromy committed
233

234
				nd, err := ipld.Decode(b)
235
				if err != nil {
236
					out <- &ipld.NodeOption{Err: err}
237 238
					return
				}
Jeromy's avatar
Jeromy committed
239

240
				out <- &ipld.NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
241 242
				count++

243
			case <-ctx.Done():
244
				out <- &ipld.NodeOption{Err: ctx.Err()}
Jeromy's avatar
Jeromy committed
245
				return
246 247 248
			}
		}
	}()
249
	return out
250 251
}

252 253
// GetLinks is the type of function passed to the EnumerateChildren function(s)
// for getting the children of an IPLD node.
254
type GetLinks func(context.Context, *cid.Cid) ([]*ipld.Link, error)
Jeromy's avatar
Jeromy committed
255

256 257 258 259
// GetLinksWithDAG returns a GetLinks function that tries to use the given
// NodeGetter as a LinkGetter to get the children of a given IPLD node. This may
// allow us to traverse the DAG without actually loading and parsing the node in
// question (if we already have the links cached).
260 261 262
func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks {
	return func(ctx context.Context, c *cid.Cid) ([]*ipld.Link, error) {
		return ipld.GetLinks(ctx, ng, c)
Jeromy's avatar
Jeromy committed
263
	}
264
}
265

266 267 268
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
269 270 271
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
272 273
		return err
	}
274
	for _, lnk := range links {
275
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
276
		if visit(c) {
277
			err = EnumerateChildren(ctx, getLinks, c, visit)
278 279 280 281 282 283 284
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
285

286
// ProgressTracker is used to show progress when fetching nodes.
287 288 289 290 291
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

292 293
// DeriveContext returns a new context with value "progress" derived from
// the given one.
294
func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
295
	return context.WithValue(ctx, progressContextKey, p)
296 297
}

298
// Increment adds one to the total progress.
299 300 301 302 303 304
func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

305
// Value returns the current progress.
306 307 308 309 310 311
func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

312 313 314
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
315

316 317 318 319
// EnumerateChildrenAsync is equivalent to EnumerateChildren *except* that it
// fetches children in parallel.
//
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
320
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
321
	feed := make(chan *cid.Cid)
322
	out := make(chan []*ipld.Link)
323 324 325
	done := make(chan struct{})

	var setlk sync.Mutex
326

327 328
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
329

330
	defer cancel()
331

332 333
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
334 335
			for ic := range feed {
				setlk.Lock()
336
				shouldVisit := visit(ic)
337
				setlk.Unlock()
338

339 340 341 342 343 344 345
				if shouldVisit {
					links, err := getLinks(ctx, ic)
					if err != nil {
						errChan <- err
						return
					}

346
					select {
347
					case out <- links:
348
					case <-fetchersCtx.Done():
349 350 351
						return
					}
				}
Jeromy's avatar
Jeromy committed
352
				select {
353
				case done <- struct{}{}:
354
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
355 356
				}
			}
357
		}()
Jeromy's avatar
Jeromy committed
358
	}
359
	defer close(feed)
Jeromy's avatar
Jeromy committed
360

361
	send := feed
362
	var todobuffer []*cid.Cid
363
	var inProgress int
Jeromy's avatar
Jeromy committed
364

365
	next := c
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
382 383
		case links := <-out:
			for _, lnk := range links {
384 385 386 387 388 389
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
390
			}
391 392
		case err := <-errChan:
			return err
393

394
		case <-ctx.Done():
395
			return ctx.Err()
396
		}
Jeromy's avatar
Jeromy committed
397
	}
398

Jeromy's avatar
Jeromy committed
399
}
400

401 402 403 404
var _ ipld.LinkGetter = &dagService{}
var _ ipld.NodeGetter = &dagService{}
var _ ipld.NodeGetter = &sesGetter{}
var _ ipld.DAGService = &dagService{}