merkledag.go 9.03 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
Jeromy's avatar
Jeromy committed
6

7
	blocks "github.com/ipfs/go-ipfs/blocks"
8
	key "github.com/ipfs/go-ipfs/blocks/key"
9
	bserv "github.com/ipfs/go-ipfs/blockservice"
Jeromy's avatar
Jeromy committed
10 11
	"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
	logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12 13
)

Jeromy's avatar
Jeromy committed
14
var log = logging.Logger("merkledag")
15
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
16

Jeromy's avatar
Jeromy committed
17 18
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
19
	Add(*Node) (key.Key, error)
Jeromy's avatar
Jeromy committed
20
	AddRecursive(*Node) error
21
	Get(context.Context, key.Key) (*Node, error)
Jeromy's avatar
Jeromy committed
22
	Remove(*Node) error
Jeromy's avatar
Jeromy committed
23
	RemoveRecursive(*Node) error
24 25 26

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
27
	GetMany(context.Context, []key.Key) (<-chan *Node, <-chan error)
28 29

	Batch() *Batch
Jeromy's avatar
Jeromy committed
30 31 32 33 34 35
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

36
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
37 38
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
39 40
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
41
type dagService struct {
42
	Blocks *bserv.BlockService
43 44
}

45
// Add adds a node to the dagService, storing the block in the BlockService
46
func (n *dagService) Add(nd *Node) (key.Key, error) {
47
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
48
		return "", fmt.Errorf("dagService is nil")
49 50 51 52 53 54 55
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

56 57 58
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
59 60 61 62 63 64 65
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

66 67 68 69
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

Jeromy's avatar
Jeromy committed
70
// AddRecursive adds the given node and all child nodes to the BlockService
71
func (n *dagService) AddRecursive(nd *Node) error {
72 73
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
74
		log.Info("AddRecursive Error: %s\n", err)
75 76 77 78
		return err
	}

	for _, link := range nd.Links {
79 80 81 82 83
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
84 85 86 87 88 89
		}
	}

	return nil
}

90
// Get retrieves a node from the dagService, fetching the block in the BlockService
91
func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
92
	if n == nil {
93
		return nil, fmt.Errorf("dagService is nil")
94
	}
95 96
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
97

Jeromy's avatar
Jeromy committed
98
	b, err := n.Blocks.GetBlock(ctx, k)
99
	if err != nil {
100 101 102
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
103 104 105 106 107
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
108

Jeromy's avatar
Jeromy committed
109
// Remove deletes the given node and all of its children from the BlockService
Jeromy's avatar
Jeromy committed
110
func (n *dagService) RemoveRecursive(nd *Node) error {
Jeromy's avatar
Jeromy committed
111 112
	for _, l := range nd.Links {
		if l.Node != nil {
Jeromy's avatar
Jeromy committed
113
			n.RemoveRecursive(l.Node)
Jeromy's avatar
Jeromy committed
114 115 116 117 118 119 120 121
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
122

Jeromy's avatar
Jeromy committed
123 124 125 126 127 128 129 130
func (n *dagService) Remove(nd *Node) error {
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}

131 132
// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
Jeromy's avatar
Jeromy committed
133
	return EnumerateChildrenAsync(ctx, serv, root, key.NewKeySet())
Jeromy's avatar
Jeromy committed
134
}
135

Jeromy's avatar
Jeromy committed
136 137
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
138
func FindLinks(links []key.Key, k key.Key, start int) []int {
Jeromy's avatar
Jeromy committed
139
	var out []int
Jeromy's avatar
Jeromy committed
140 141
	for i, lnk_k := range links[start:] {
		if k == lnk_k {
Jeromy's avatar
Jeromy committed
142
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
143 144
		}
	}
Jeromy's avatar
Jeromy committed
145
	return out
Jeromy's avatar
Jeromy committed
146 147
}

148
func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node, <-chan error) {
Jeromy's avatar
Jeromy committed
149
	out := make(chan *Node, len(keys))
150 151
	errs := make(chan error, 1)
	blocks := ds.Blocks.GetBlocks(ctx, keys)
152 153
	var count int

154 155 156 157 158 159
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
160 161 162
					if count != len(keys) {
						errs <- fmt.Errorf("failed to fetch all nodes")
					}
163 164 165 166 167 168 169
					return
				}
				nd, err := Decoded(b.Data)
				if err != nil {
					errs <- err
					return
				}
Jeromy's avatar
Jeromy committed
170 171 172 173 174

				// buffered, no need to select
				out <- nd
				count++

175
			case <-ctx.Done():
Jeromy's avatar
Jeromy committed
176
				errs <- ctx.Err()
177 178 179 180 181 182 183
				return
			}
		}
	}()
	return out, errs
}

184
// GetDAG will fill out all of the links of the given Node.
185 186
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
187
func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter {
188
	var keys []key.Key
Jeromy's avatar
Jeromy committed
189
	for _, lnk := range root.Links {
190
		keys = append(keys, key.Key(lnk.Hash))
Jeromy's avatar
Jeromy committed
191 192
	}

193
	return GetNodes(ctx, ds, keys)
Jeromy's avatar
Jeromy committed
194 195
}

Jeromy's avatar
Jeromy committed
196 197
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
198
func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
199 200 201 202 203 204

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
205 206
	promises := make([]NodeGetter, len(keys))
	sendChans := make([]chan<- *Node, len(keys))
rht's avatar
rht committed
207
	for i := range keys {
Jeromy's avatar
Jeromy committed
208 209 210
		promises[i], sendChans[i] = newNodePromise(ctx)
	}

211
	dedupedKeys := dedupeKeys(keys)
212
	go func() {
213 214 215
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

216
		nodechan, errchan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
217

218
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
219
			select {
220
			case nd, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
221 222 223
				if !ok {
					return
				}
Jeromy's avatar
Jeromy committed
224

225
				k, err := nd.Key()
Jeromy's avatar
Jeromy committed
226
				if err != nil {
227 228
					log.Error("Failed to get node key: ", err)
					continue
Jeromy's avatar
Jeromy committed
229
				}
230 231

				is := FindLinks(keys, k, 0)
Jeromy's avatar
Jeromy committed
232
				for _, i := range is {
233
					count++
234
					sendChans[i] <- nd
Jeromy's avatar
Jeromy committed
235
				}
236 237 238
			case err := <-errchan:
				log.Error("error fetching: ", err)
				return
Jeromy's avatar
Jeromy committed
239 240
			case <-ctx.Done():
				return
241 242 243
			}
		}
	}()
Jeromy's avatar
Jeromy committed
244 245 246
	return promises
}

247
// Remove duplicates from a list of keys
248 249 250
func dedupeKeys(ks []key.Key) []key.Key {
	kmap := make(map[key.Key]struct{})
	var out []key.Key
251 252 253 254 255 256 257 258 259
	for _, k := range ks {
		if _, ok := kmap[k]; !ok {
			kmap[k] = struct{}{}
			out = append(out, k)
		}
	}
	return out
}

Jeromy's avatar
Jeromy committed
260 261 262 263 264 265 266 267 268 269 270 271 272 273
func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
	ch := make(chan *Node, 1)
	return &nodePromise{
		recv: ch,
		ctx:  ctx,
	}, ch
}

type nodePromise struct {
	cache *Node
	recv  <-chan *Node
	ctx   context.Context
}

Jeromy's avatar
Jeromy committed
274 275 276 277
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
278
type NodeGetter interface {
279
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
280 281
}

282
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
Jeromy's avatar
Jeromy committed
283 284 285 286 287 288 289 290 291
	if np.cache != nil {
		return np.cache, nil
	}

	select {
	case blk := <-np.recv:
		np.cache = blk
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
292 293
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
294 295
	}
	return np.cache, nil
296
}
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334

type Batch struct {
	ds *dagService

	blocks  []*blocks.Block
	size    int
	MaxSize int
}

func (t *Batch) Add(nd *Node) (key.Key, error) {
	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
	if err != nil {
		return "", err
	}

	k := key.Key(b.Multihash)

	t.blocks = append(t.blocks, b)
	t.size += len(b.Data)
	if t.size > t.MaxSize {
		return k, t.Commit()
	}
	return k, nil
}

func (t *Batch) Commit() error {
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
	t.size = 0
	return err
}
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
	for _, lnk := range root.Links {
		k := key.Key(lnk.Hash)
		if !set.Has(k) {
			set.Add(k)
			child, err := ds.Get(ctx, k)
			if err != nil {
				return err
			}
			err = EnumerateChildren(ctx, ds, child, set)
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
	toprocess := make(chan []key.Key, 8)
	nodes := make(chan *Node, 8)
	errs := make(chan error, 1)

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

	go fetchNodes(ctx, ds, toprocess, nodes, errs)

	nodes <- root
	live := 1

	for {
		select {
		case nd, ok := <-nodes:
			if !ok {
				return nil
			}
			// a node has been fetched
			live--

			var keys []key.Key
			for _, lnk := range nd.Links {
				k := key.Key(lnk.Hash)
				if !set.Has(k) {
					set.Add(k)
					live++
					keys = append(keys, k)
				}
			}

			if live == 0 {
				return nil
			}

			if len(keys) > 0 {
				select {
				case toprocess <- keys:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case err := <-errs:
			return err
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
	defer close(out)

412 413 414
	get := func(ks []key.Key) {
		nodes, errch := ds.GetMany(ctx, ks)
		for {
Jeromy's avatar
Jeromy committed
415
			select {
416 417 418 419 420 421 422 423 424 425 426 427
			case nd, ok := <-nodes:
				if !ok {
					return
				}
				select {
				case out <- nd:
				case <-ctx.Done():
					return
				}
			case err := <-errch:
				errs <- err
				return
Jeromy's avatar
Jeromy committed
428 429 430 431 432
			}
		}
	}

	for ks := range in {
433
		go get(ks)
Jeromy's avatar
Jeromy committed
434 435
	}
}