merkledag.go 9.51 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
6
	"strings"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

9
	blocks "github.com/ipfs/go-ipfs/blocks"
10
	key "github.com/ipfs/go-ipfs/blocks/key"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
Jakub Sztandera's avatar
Jakub Sztandera committed
12
	logging "gx/ipfs/QmYtB7Qge8cJpXc4irsEp8zRqfnZMBeB7aTrMEkPk67DRv/go-log"
Jeromy's avatar
Jeromy committed
13
	"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14 15
)

Jeromy's avatar
Jeromy committed
16
var log = logging.Logger("merkledag")
17
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
18

Jeromy's avatar
Jeromy committed
19 20
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
21 22
	Add(*Node) (key.Key, error)
	Get(context.Context, key.Key) (*Node, error)
Jeromy's avatar
Jeromy committed
23
	Remove(*Node) error
24 25 26

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
27
	GetMany(context.Context, []key.Key) <-chan *NodeOption
28 29

	Batch() *Batch
Jeromy's avatar
Jeromy committed
30 31 32 33 34 35
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

36
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
37 38
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
39 40
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
41
type dagService struct {
42
	Blocks *bserv.BlockService
43 44
}

45
// Add adds a node to the dagService, storing the block in the BlockService
46
func (n *dagService) Add(nd *Node) (key.Key, error) {
47
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
48
		return "", fmt.Errorf("dagService is nil")
49 50
	}

51
	d, err := nd.EncodeProtobuf(false)
52 53 54 55
	if err != nil {
		return "", err
	}

56
	mh, err := nd.Multihash()
57 58 59 60
	if err != nil {
		return "", err
	}

61 62
	b, _ := blocks.NewBlockWithHash(d, mh)

63 64 65
	return n.Blocks.AddBlock(b)
}

66 67 68 69
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

70
// Get retrieves a node from the dagService, fetching the block in the BlockService
71
func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
jbenet's avatar
jbenet committed
72 73 74
	if k == "" {
		return nil, ErrNotFound
	}
75
	if n == nil {
76
		return nil, fmt.Errorf("dagService is nil")
77
	}
78 79
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
80

Jeromy's avatar
Jeromy committed
81
	b, err := n.Blocks.GetBlock(ctx, k)
82
	if err != nil {
83 84 85
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
86
		return nil, fmt.Errorf("Failed to get block for %s: %v", k.B58String(), err)
87 88
	}

89
	res, err := DecodeProtobuf(b.Data())
90
	if err != nil {
91
		if strings.Contains(err.Error(), "Unmarshal failed") {
92
			return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", k)
93
		}
94 95 96
		return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
	}
	return res, nil
97
}
Jeromy's avatar
Jeromy committed
98

Jeromy's avatar
Jeromy committed
99 100 101 102 103 104 105 106
func (n *dagService) Remove(nd *Node) error {
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}

107 108
// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
Jeromy's avatar
Jeromy committed
109
	return EnumerateChildrenAsync(ctx, serv, root, key.NewKeySet())
Jeromy's avatar
Jeromy committed
110
}
111

Jeromy's avatar
Jeromy committed
112 113
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
114
func FindLinks(links []key.Key, k key.Key, start int) []int {
Jeromy's avatar
Jeromy committed
115
	var out []int
Jeromy's avatar
Jeromy committed
116 117
	for i, lnk_k := range links[start:] {
		if k == lnk_k {
Jeromy's avatar
Jeromy committed
118
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
119 120
		}
	}
Jeromy's avatar
Jeromy committed
121
	return out
Jeromy's avatar
Jeromy committed
122 123
}

124 125 126 127 128 129 130
type NodeOption struct {
	Node *Node
	Err  error
}

func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeOption {
	out := make(chan *NodeOption, len(keys))
131
	blocks := ds.Blocks.GetBlocks(ctx, keys)
132 133
	var count int

134 135 136 137 138 139
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
140
					if count != len(keys) {
141
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
142
					}
143 144
					return
				}
145
				nd, err := DecodeProtobuf(b.Data())
146
				if err != nil {
147
					out <- &NodeOption{Err: err}
148 149
					return
				}
Jeromy's avatar
Jeromy committed
150 151

				// buffered, no need to select
152
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
153 154
				count++

155
			case <-ctx.Done():
156
				out <- &NodeOption{Err: ctx.Err()}
157 158 159 160
				return
			}
		}
	}()
161
	return out
162 163
}

164
// GetDAG will fill out all of the links of the given Node.
165 166
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
167
func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter {
168
	var keys []key.Key
Jeromy's avatar
Jeromy committed
169
	for _, lnk := range root.Links {
170
		keys = append(keys, key.Key(lnk.Hash))
Jeromy's avatar
Jeromy committed
171 172
	}

173
	return GetNodes(ctx, ds, keys)
Jeromy's avatar
Jeromy committed
174 175
}

Jeromy's avatar
Jeromy committed
176 177
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
178
func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
179 180 181 182 183 184

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
185
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
186
	for i := range keys {
187
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
188 189
	}

190
	dedupedKeys := dedupeKeys(keys)
191
	go func() {
192 193 194
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

195
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
196

197
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
198
			select {
199
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
200
				if !ok {
Jeromy's avatar
Jeromy committed
201 202 203
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
204 205
					return
				}
Jeromy's avatar
Jeromy committed
206

207
				if opt.Err != nil {
208 209 210
					for _, p := range promises {
						p.Fail(opt.Err)
					}
211 212 213 214 215
					return
				}

				nd := opt.Node

216
				k, err := nd.Key()
Jeromy's avatar
Jeromy committed
217
				if err != nil {
218 219
					log.Error("Failed to get node key: ", err)
					continue
Jeromy's avatar
Jeromy committed
220
				}
221 222

				is := FindLinks(keys, k, 0)
Jeromy's avatar
Jeromy committed
223
				for _, i := range is {
224
					count++
225
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
226 227 228
				}
			case <-ctx.Done():
				return
229 230 231
			}
		}
	}()
Jeromy's avatar
Jeromy committed
232 233 234
	return promises
}

235
// Remove duplicates from a list of keys
236 237 238
func dedupeKeys(ks []key.Key) []key.Key {
	kmap := make(map[key.Key]struct{})
	var out []key.Key
239 240 241 242 243 244 245 246 247
	for _, k := range ks {
		if _, ok := kmap[k]; !ok {
			kmap[k] = struct{}{}
			out = append(out, k)
		}
	}
	return out
}

248
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
249
	return &nodePromise{
250
		recv: make(chan *Node, 1),
Jeromy's avatar
Jeromy committed
251
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
252
		err:  make(chan error, 1),
253
	}
Jeromy's avatar
Jeromy committed
254 255 256 257
}

type nodePromise struct {
	cache *Node
258 259
	clk   sync.Mutex
	recv  chan *Node
Jeromy's avatar
Jeromy committed
260
	ctx   context.Context
Jeromy's avatar
Jeromy committed
261
	err   chan error
Jeromy's avatar
Jeromy committed
262 263
}

Jeromy's avatar
Jeromy committed
264 265 266 267
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
268
type NodeGetter interface {
269
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
270
	Fail(err error)
271
	Send(*Node)
Jeromy's avatar
Jeromy committed
272 273 274
}

func (np *nodePromise) Fail(err error) {
275 276 277 278 279 280 281 282 283
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
284
	np.err <- err
Jeromy's avatar
Jeromy committed
285 286
}

287 288 289
func (np *nodePromise) Send(nd *Node) {
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
290
	if np.cache != nil {
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
309 310 311
	}

	select {
312 313
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
314 315
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
316 317
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
318 319
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
320
	}
321
}
322 323 324 325

type Batch struct {
	ds *dagService

326
	blocks  []blocks.Block
327 328 329 330 331
	size    int
	MaxSize int
}

func (t *Batch) Add(nd *Node) (key.Key, error) {
332
	d, err := nd.EncodeProtobuf(false)
333 334 335 336
	if err != nil {
		return "", err
	}

337
	mh, err := nd.Multihash()
338 339 340 341
	if err != nil {
		return "", err
	}

342 343 344
	b, _ := blocks.NewBlockWithHash(d, mh)

	k := key.Key(mh)
345 346

	t.blocks = append(t.blocks, b)
347
	t.size += len(b.Data())
348 349 350 351 352 353 354 355 356 357 358 359
	if t.size > t.MaxSize {
		return k, t.Commit()
	}
	return k, nil
}

func (t *Batch) Commit() error {
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
	t.size = 0
	return err
}
360 361 362 363

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
364
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet, bestEffort bool) error {
365 366 367 368 369 370
	for _, lnk := range root.Links {
		k := key.Key(lnk.Hash)
		if !set.Has(k) {
			set.Add(k)
			child, err := ds.Get(ctx, k)
			if err != nil {
371 372 373 374 375
				if bestEffort && err == ErrNotFound {
					continue
				} else {
					return err
				}
376
			}
377
			err = EnumerateChildren(ctx, ds, child, set, bestEffort)
378 379 380 381 382 383 384
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
385 386 387

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
	toprocess := make(chan []key.Key, 8)
388
	nodes := make(chan *NodeOption, 8)
Jeromy's avatar
Jeromy committed
389 390 391 392 393

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

394
	go fetchNodes(ctx, ds, toprocess, nodes)
Jeromy's avatar
Jeromy committed
395

396
	nodes <- &NodeOption{Node: root}
Jeromy's avatar
Jeromy committed
397 398 399 400
	live := 1

	for {
		select {
401
		case opt, ok := <-nodes:
Jeromy's avatar
Jeromy committed
402 403 404
			if !ok {
				return nil
			}
405 406 407 408 409 410 411

			if opt.Err != nil {
				return opt.Err
			}

			nd := opt.Node

Jeromy's avatar
Jeromy committed
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
			// a node has been fetched
			live--

			var keys []key.Key
			for _, lnk := range nd.Links {
				k := key.Key(lnk.Hash)
				if !set.Has(k) {
					set.Add(k)
					live++
					keys = append(keys, k)
				}
			}

			if live == 0 {
				return nil
			}

			if len(keys) > 0 {
				select {
				case toprocess <- keys:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

442 443 444 445 446 447 448 449
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *NodeOption) {
	var wg sync.WaitGroup
	defer func() {
		// wait for all 'get' calls to complete so we don't accidentally send
		// on a closed channel
		wg.Wait()
		close(out)
	}()
Jeromy's avatar
Jeromy committed
450

451
	get := func(ks []key.Key) {
452 453 454
		defer wg.Done()
		nodes := ds.GetMany(ctx, ks)
		for opt := range nodes {
Jeromy's avatar
Jeromy committed
455
			select {
456 457
			case out <- opt:
			case <-ctx.Done():
458
				return
Jeromy's avatar
Jeromy committed
459 460 461 462 463
			}
		}
	}

	for ks := range in {
464
		wg.Add(1)
465
		go get(ks)
Jeromy's avatar
Jeromy committed
466 467
	}
}