merkledag.go 9.47 KB
Newer Older
1
// package merkledag implements the IPFS Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

9
	bserv "github.com/ipfs/go-ipfs/blockservice"
Jeromy's avatar
Jeromy committed
10

Steven Allen's avatar
Steven Allen committed
11 12 13
	ipldcbor "gx/ipfs/QmNRz7BDWfdFNVLt7AVvmRefkrURD25EeoipcXqo6yoXU1/go-ipld-cbor"
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
	node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
14
	blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

17 18 19 20 21 22 23 24 25
// TODO: We should move these registrations elsewhere. Really, most of the IPLD
// functionality should go in a `go-ipld` repo but that will take a lot of work
// and design.
func init() {
	node.Register(cid.DagProtobuf, DecodeProtobufBlock)
	node.Register(cid.Raw, DecodeRawBlock)
	node.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
}

26
// NewDAGService constructs a new DAGService (using the default implementation).
27
func NewDAGService(bs bserv.BlockService) *dagService {
28
	return &dagService{Blocks: bs}
Jeromy's avatar
Jeromy committed
29 30
}

31
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
32 33
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
34 35
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
36
type dagService struct {
37
	Blocks bserv.BlockService
38 39
}

40
// Add adds a node to the dagService, storing the block in the BlockService
41
func (n *dagService) Add(ctx context.Context, nd node.Node) error {
42
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
43
		return fmt.Errorf("dagService is nil")
44 45
	}

46
	return n.Blocks.AddBlock(nd)
47 48
}

49 50 51 52
func (n *dagService) AddMany(ctx context.Context, nds []node.Node) error {
	blks := make([]blocks.Block, len(nds))
	for i, nd := range nds {
		blks[i] = nd
53
	}
54
	return n.Blocks.AddBlocks(blks)
55 56
}

57
// Get retrieves a node from the dagService, fetching the block in the BlockService
Jeromy's avatar
Jeromy committed
58
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
59
	if n == nil {
60
		return nil, fmt.Errorf("dagService is nil")
61
	}
Jeromy's avatar
Jeromy committed
62

63 64
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
65

Jeromy's avatar
Jeromy committed
66
	b, err := n.Blocks.GetBlock(ctx, c)
67
	if err != nil {
68
		if err == bserv.ErrNotFound {
69
			return nil, node.ErrNotFound
70
		}
Jeromy's avatar
Jeromy committed
71
		return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
72 73
	}

74
	return node.Decode(b)
75
}
Jeromy's avatar
Jeromy committed
76

77 78
// GetLinks return the links for the node, the node doesn't necessarily have
// to exist locally.
Jeromy's avatar
Jeromy committed
79
func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
80 81 82
	if c.Type() == cid.Raw {
		return nil, nil
	}
83 84 85 86
	node, err := n.Get(ctx, c)
	if err != nil {
		return nil, err
	}
87
	return node.Links(), nil
88 89
}

90 91
func (n *dagService) Remove(ctx context.Context, c *cid.Cid) error {
	return n.Blocks.DeleteBlock(c)
92 93
}

94 95 96 97 98 99 100 101 102 103 104 105 106
// RemoveMany removes multiple nodes from the DAG. It will likely be faster than
// removing them individually.
//
// This operation is not atomic. If it returns an error, some nodes may or may
// not have been removed.
func (n *dagService) RemoveMany(ctx context.Context, cids []*cid.Cid) error {
	// TODO(#4608): make this batch all the way down.
	for _, c := range cids {
		if err := n.Blocks.DeleteBlock(c); err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
107 108
}

109 110 111
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService.  If the node does not exist
// locally (and can not be retrieved) an error will be returned.
Jeromy's avatar
Jeromy committed
112
func GetLinksDirect(serv node.NodeGetter) GetLinks {
113
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
114
		nd, err := serv.Get(ctx, c)
115
		if err != nil {
Jeromy's avatar
Jeromy committed
116
			if err == bserv.ErrNotFound {
117
				err = node.ErrNotFound
Jeromy's avatar
Jeromy committed
118
			}
119 120
			return nil, err
		}
121
		return nd.Links(), nil
122 123 124
	}
}

125 126 127 128
type sesGetter struct {
	bs *bserv.Session
}

129
// Get gets a single node from the DAG.
130 131
func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
	blk, err := sg.bs.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
132 133
	switch err {
	case bserv.ErrNotFound:
134
		return nil, node.ErrNotFound
Jeromy's avatar
Jeromy committed
135
	default:
136
		return nil, err
Jeromy's avatar
Jeromy committed
137 138
	case nil:
		// noop
139 140
	}

141
	return node.Decode(blk)
142 143
}

144 145 146 147 148
// GetMany gets many nodes at once, batching the request if possible.
func (sg *sesGetter) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *node.NodeOption {
	return getNodesFromBG(ctx, sg.bs, keys)
}

149
// FetchGraph fetches all nodes that are children of the given node
150
func FetchGraph(ctx context.Context, root *cid.Cid, serv node.DAGService) error {
151 152 153
	var ng node.NodeGetter = serv
	ds, ok := serv.(*dagService)
	if ok {
154
		ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
155 156
	}

157 158
	v, _ := ctx.Value("progress").(*ProgressTracker)
	if v == nil {
159
		return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
160 161 162 163 164 165 166 167 168 169
	}
	set := cid.NewSet()
	visit := func(c *cid.Cid) bool {
		if set.Visit(c) {
			v.Increment()
			return true
		} else {
			return false
		}
	}
170
	return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
Jeromy's avatar
Jeromy committed
171
}
172

Jeromy's avatar
Jeromy committed
173 174
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
175
func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
Jeromy's avatar
Jeromy committed
176
	var out []int
Jeromy's avatar
Jeromy committed
177 178
	for i, lnk_c := range links[start:] {
		if c.Equals(lnk_c) {
Jeromy's avatar
Jeromy committed
179
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
180 181
		}
	}
Jeromy's avatar
Jeromy committed
182
	return out
Jeromy's avatar
Jeromy committed
183 184
}

185 186 187 188 189 190 191
// GetMany gets many nodes from the DAG at once.
//
// This method may not return all requested nodes (and may or may not return an
// error indicating that it failed to do so. It is up to the caller to verify
// that it received all nodes.
func (n *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *node.NodeOption {
	return getNodesFromBG(ctx, n.Blocks, keys)
192 193
}

194 195 196
func getNodesFromBG(ctx context.Context, bs bserv.BlockGetter, keys []*cid.Cid) <-chan *node.NodeOption {
	out := make(chan *node.NodeOption, len(keys))
	blocks := bs.GetBlocks(ctx, keys)
197 198
	var count int

199 200 201 202 203 204
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
205
					if count != len(keys) {
206
						out <- &node.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
207
					}
208 209
					return
				}
Jeromy's avatar
Jeromy committed
210

211
				nd, err := node.Decode(b)
212
				if err != nil {
213
					out <- &node.NodeOption{Err: err}
214 215
					return
				}
Jeromy's avatar
Jeromy committed
216

217
				out <- &node.NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
218 219
				count++

220
			case <-ctx.Done():
221
				out <- &node.NodeOption{Err: ctx.Err()}
Jeromy's avatar
Jeromy committed
222
				return
223 224 225
			}
		}
	}()
226
	return out
227 228
}

229 230 231
// GetLinks is the type of function passed to the EnumerateChildren function(s)
// for getting the children of an IPLD node.
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)
Jeromy's avatar
Jeromy committed
232

233 234 235 236 237 238 239
// GetLinksWithDAG returns a GetLinks function that tries to use the given
// NodeGetter as a LinkGetter to get the children of a given IPLD node. This may
// allow us to traverse the DAG without actually loading and parsing the node in
// question (if we already have the links cached).
func GetLinksWithDAG(ng node.NodeGetter) GetLinks {
	return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
		return node.GetLinks(ctx, ng, c)
Jeromy's avatar
Jeromy committed
240
	}
241
}
242

243 244 245
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
246 247 248
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
	links, err := getLinks(ctx, root)
	if err != nil {
249 250
		return err
	}
251
	for _, lnk := range links {
252
		c := lnk.Cid
Jeromy's avatar
Jeromy committed
253
		if visit(c) {
254
			err = EnumerateChildren(ctx, getLinks, c, visit)
255 256 257 258 259 260 261
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
262

263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
type ProgressTracker struct {
	Total int
	lk    sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
	return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
	p.lk.Lock()
	defer p.lk.Unlock()
	p.Total++
}

func (p *ProgressTracker) Value() int {
	p.lk.Lock()
	defer p.lk.Unlock()
	return p.Total
}

284 285 286
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Jeromy's avatar
Jeromy committed
287

288 289 290 291
// EnumerateChildrenAsync is equivalent to EnumerateChildren *except* that it
// fetches children in parallel.
//
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
292
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
293
	feed := make(chan *cid.Cid)
294
	out := make(chan []*node.Link)
295 296 297
	done := make(chan struct{})

	var setlk sync.Mutex
298

299 300
	errChan := make(chan error)
	fetchersCtx, cancel := context.WithCancel(ctx)
301

302
	defer cancel()
303

304 305
	for i := 0; i < FetchGraphConcurrency; i++ {
		go func() {
306
			for ic := range feed {
307
				links, err := getLinks(ctx, ic)
308 309 310
				if err != nil {
					errChan <- err
					return
Jeromy's avatar
Jeromy committed
311
				}
312

313 314 315
				setlk.Lock()
				unseen := visit(ic)
				setlk.Unlock()
316

317
				if unseen {
318
					select {
319
					case out <- links:
320
					case <-fetchersCtx.Done():
321 322 323
						return
					}
				}
Jeromy's avatar
Jeromy committed
324
				select {
325
				case done <- struct{}{}:
326
				case <-fetchersCtx.Done():
Jeromy's avatar
Jeromy committed
327 328
				}
			}
329
		}()
Jeromy's avatar
Jeromy committed
330
	}
331
	defer close(feed)
Jeromy's avatar
Jeromy committed
332

333
	send := feed
334
	var todobuffer []*cid.Cid
335
	var inProgress int
Jeromy's avatar
Jeromy committed
336

337
	next := c
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
	for {
		select {
		case send <- next:
			inProgress++
			if len(todobuffer) > 0 {
				next = todobuffer[0]
				todobuffer = todobuffer[1:]
			} else {
				next = nil
				send = nil
			}
		case <-done:
			inProgress--
			if inProgress == 0 && next == nil {
				return nil
			}
354 355
		case links := <-out:
			for _, lnk := range links {
356 357 358 359 360 361
				if next == nil {
					next = lnk.Cid
					send = feed
				} else {
					todobuffer = append(todobuffer, lnk.Cid)
				}
Jeromy's avatar
Jeromy committed
362
			}
363 364
		case err := <-errChan:
			return err
365

366
		case <-ctx.Done():
367
			return ctx.Err()
368
		}
Jeromy's avatar
Jeromy committed
369
	}
370

Jeromy's avatar
Jeromy committed
371
}
372 373 374 375 376

var _ node.LinkGetter = &dagService{}
var _ node.NodeGetter = &dagService{}
var _ node.NodeGetter = &sesGetter{}
var _ node.DAGService = &dagService{}