merkledag.go 9.58 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
6
	"strings"
7
	"sync"
Jeromy's avatar
Jeromy committed
8

9
	blocks "github.com/ipfs/go-ipfs/blocks"
10
	key "github.com/ipfs/go-ipfs/blocks/key"
11
	bserv "github.com/ipfs/go-ipfs/blockservice"
Jeromy's avatar
Jeromy committed
12
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
Jeromy's avatar
Jeromy committed
13
	"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14 15
)

Jeromy's avatar
Jeromy committed
16
var log = logging.Logger("merkledag")
17
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
18

Jeromy's avatar
Jeromy committed
19 20
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
21 22
	Add(*Node) (key.Key, error)
	Get(context.Context, key.Key) (*Node, error)
Jeromy's avatar
Jeromy committed
23
	Remove(*Node) error
24 25 26

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
27
	GetMany(context.Context, []key.Key) <-chan *NodeOption
28 29

	Batch() *Batch
Jeromy's avatar
Jeromy committed
30 31 32 33 34 35
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

36
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
37 38
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
39 40
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
41
type dagService struct {
42
	Blocks *bserv.BlockService
43 44
}

45
// Add adds a node to the dagService, storing the block in the BlockService
46
func (n *dagService) Add(nd *Node) (key.Key, error) {
47
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
48
		return "", fmt.Errorf("dagService is nil")
49 50
	}

51
	d, err := nd.EncodeProtobuf(false)
52 53 54 55
	if err != nil {
		return "", err
	}

56
	mh, err := nd.Multihash()
57 58 59 60
	if err != nil {
		return "", err
	}

61 62
	b, _ := blocks.NewBlockWithHash(d, mh)

63 64 65
	return n.Blocks.AddBlock(b)
}

66 67 68 69
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

70
// Get retrieves a node from the dagService, fetching the block in the BlockService
71
func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
jbenet's avatar
jbenet committed
72 73 74
	if k == "" {
		return nil, ErrNotFound
	}
75
	if n == nil {
76
		return nil, fmt.Errorf("dagService is nil")
77
	}
78 79
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
80

Jeromy's avatar
Jeromy committed
81
	b, err := n.Blocks.GetBlock(ctx, k)
82
	if err != nil {
83 84 85
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
86
		return nil, fmt.Errorf("Failed to get block for %s: %v", k.B58String(), err)
87 88
	}

89
	res, err := DecodeProtobuf(b.Data())
90
	if err != nil {
91
		if strings.Contains(err.Error(), "Unmarshal failed") {
92
			return nil, fmt.Errorf("The block referred to by '%s' was not a valid merkledag node", k)
93
		}
94 95
		return nil, fmt.Errorf("Failed to decode Protocol Buffers: %v", err)
	}
96 97 98

	res.cached = k.ToMultihash()

99
	return res, nil
100
}
Jeromy's avatar
Jeromy committed
101

Jeromy's avatar
Jeromy committed
102 103 104 105 106 107 108 109
func (n *dagService) Remove(nd *Node) error {
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}

110 111
// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
Jeromy's avatar
Jeromy committed
112
	return EnumerateChildrenAsync(ctx, serv, root, key.NewKeySet())
Jeromy's avatar
Jeromy committed
113
}
114

Jeromy's avatar
Jeromy committed
115 116
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
117
func FindLinks(links []key.Key, k key.Key, start int) []int {
Jeromy's avatar
Jeromy committed
118
	var out []int
Jeromy's avatar
Jeromy committed
119 120
	for i, lnk_k := range links[start:] {
		if k == lnk_k {
Jeromy's avatar
Jeromy committed
121
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
122 123
		}
	}
Jeromy's avatar
Jeromy committed
124
	return out
Jeromy's avatar
Jeromy committed
125 126
}

127 128 129 130 131 132 133
type NodeOption struct {
	Node *Node
	Err  error
}

func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeOption {
	out := make(chan *NodeOption, len(keys))
134
	blocks := ds.Blocks.GetBlocks(ctx, keys)
135 136
	var count int

137 138 139 140 141 142
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
143
					if count != len(keys) {
144
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
145
					}
146 147
					return
				}
148
				nd, err := DecodeProtobuf(b.Data())
149
				if err != nil {
150
					out <- &NodeOption{Err: err}
151 152
					return
				}
153
				nd.cached = b.Key().ToMultihash()
Jeromy's avatar
Jeromy committed
154 155

				// buffered, no need to select
156
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
157 158
				count++

159
			case <-ctx.Done():
160
				out <- &NodeOption{Err: ctx.Err()}
161 162 163 164
				return
			}
		}
	}()
165
	return out
166 167
}

168
// GetDAG will fill out all of the links of the given Node.
169 170
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
171
func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter {
172
	var keys []key.Key
Jeromy's avatar
Jeromy committed
173
	for _, lnk := range root.Links {
174
		keys = append(keys, key.Key(lnk.Hash))
Jeromy's avatar
Jeromy committed
175 176
	}

177
	return GetNodes(ctx, ds, keys)
Jeromy's avatar
Jeromy committed
178 179
}

Jeromy's avatar
Jeromy committed
180 181
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
182
func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
183 184 185 186 187 188

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
189
	promises := make([]NodeGetter, len(keys))
rht's avatar
rht committed
190
	for i := range keys {
191
		promises[i] = newNodePromise(ctx)
Jeromy's avatar
Jeromy committed
192 193
	}

194
	dedupedKeys := dedupeKeys(keys)
195
	go func() {
196 197 198
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

199
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
200

201
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
202
			select {
203
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
204
				if !ok {
Jeromy's avatar
Jeromy committed
205 206 207
					for _, p := range promises {
						p.Fail(ErrNotFound)
					}
Jeromy's avatar
Jeromy committed
208 209
					return
				}
Jeromy's avatar
Jeromy committed
210

211
				if opt.Err != nil {
212 213 214
					for _, p := range promises {
						p.Fail(opt.Err)
					}
215 216 217 218 219
					return
				}

				nd := opt.Node

220
				k, err := nd.Key()
Jeromy's avatar
Jeromy committed
221
				if err != nil {
222 223
					log.Error("Failed to get node key: ", err)
					continue
Jeromy's avatar
Jeromy committed
224
				}
225 226

				is := FindLinks(keys, k, 0)
Jeromy's avatar
Jeromy committed
227
				for _, i := range is {
228
					count++
229
					promises[i].Send(nd)
Jeromy's avatar
Jeromy committed
230 231 232
				}
			case <-ctx.Done():
				return
233 234 235
			}
		}
	}()
Jeromy's avatar
Jeromy committed
236 237 238
	return promises
}

239
// Remove duplicates from a list of keys
240 241 242
func dedupeKeys(ks []key.Key) []key.Key {
	kmap := make(map[key.Key]struct{})
	var out []key.Key
243 244 245 246 247 248 249 250 251
	for _, k := range ks {
		if _, ok := kmap[k]; !ok {
			kmap[k] = struct{}{}
			out = append(out, k)
		}
	}
	return out
}

252
func newNodePromise(ctx context.Context) NodeGetter {
Jeromy's avatar
Jeromy committed
253
	return &nodePromise{
254
		recv: make(chan *Node, 1),
Jeromy's avatar
Jeromy committed
255
		ctx:  ctx,
Jeromy's avatar
Jeromy committed
256
		err:  make(chan error, 1),
257
	}
Jeromy's avatar
Jeromy committed
258 259 260 261
}

type nodePromise struct {
	cache *Node
262 263
	clk   sync.Mutex
	recv  chan *Node
Jeromy's avatar
Jeromy committed
264
	ctx   context.Context
Jeromy's avatar
Jeromy committed
265
	err   chan error
Jeromy's avatar
Jeromy committed
266 267
}

Jeromy's avatar
Jeromy committed
268 269 270 271
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
272
type NodeGetter interface {
273
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
274
	Fail(err error)
275
	Send(*Node)
Jeromy's avatar
Jeromy committed
276 277 278
}

func (np *nodePromise) Fail(err error) {
279 280 281 282 283 284 285 286 287
	np.clk.Lock()
	v := np.cache
	np.clk.Unlock()

	// if promise has a value, don't fail it
	if v != nil {
		return
	}

Jeromy's avatar
Jeromy committed
288
	np.err <- err
Jeromy's avatar
Jeromy committed
289 290
}

291 292 293
func (np *nodePromise) Send(nd *Node) {
	var already bool
	np.clk.Lock()
Jeromy's avatar
Jeromy committed
294
	if np.cache != nil {
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
		already = true
	}
	np.cache = nd
	np.clk.Unlock()

	if already {
		panic("sending twice to the same promise is an error!")
	}

	np.recv <- nd
}

func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
	np.clk.Lock()
	c := np.cache
	np.clk.Unlock()
	if c != nil {
		return c, nil
Jeromy's avatar
Jeromy committed
313 314 315
	}

	select {
316 317
	case nd := <-np.recv:
		return nd, nil
Jeromy's avatar
Jeromy committed
318 319
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
320 321
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
322 323
	case err := <-np.err:
		return nil, err
Jeromy's avatar
Jeromy committed
324
	}
325
}
326 327 328 329

type Batch struct {
	ds *dagService

330
	blocks  []blocks.Block
331 332 333 334 335
	size    int
	MaxSize int
}

func (t *Batch) Add(nd *Node) (key.Key, error) {
336
	d, err := nd.EncodeProtobuf(false)
337 338 339 340
	if err != nil {
		return "", err
	}

341
	mh, err := nd.Multihash()
342 343 344 345
	if err != nil {
		return "", err
	}

346 347 348
	b, _ := blocks.NewBlockWithHash(d, mh)

	k := key.Key(mh)
349 350

	t.blocks = append(t.blocks, b)
351
	t.size += len(b.Data())
352 353 354 355 356 357 358 359 360 361 362 363
	if t.size > t.MaxSize {
		return k, t.Commit()
	}
	return k, nil
}

func (t *Batch) Commit() error {
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
	t.size = 0
	return err
}
364 365 366 367

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
368
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet, bestEffort bool) error {
369 370 371 372 373 374
	for _, lnk := range root.Links {
		k := key.Key(lnk.Hash)
		if !set.Has(k) {
			set.Add(k)
			child, err := ds.Get(ctx, k)
			if err != nil {
375 376 377 378 379
				if bestEffort && err == ErrNotFound {
					continue
				} else {
					return err
				}
380
			}
381
			err = EnumerateChildren(ctx, ds, child, set, bestEffort)
382 383 384 385 386 387 388
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
389 390 391

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
	toprocess := make(chan []key.Key, 8)
392
	nodes := make(chan *NodeOption, 8)
Jeromy's avatar
Jeromy committed
393 394 395 396 397

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

398
	go fetchNodes(ctx, ds, toprocess, nodes)
Jeromy's avatar
Jeromy committed
399

400
	nodes <- &NodeOption{Node: root}
Jeromy's avatar
Jeromy committed
401 402 403 404
	live := 1

	for {
		select {
405
		case opt, ok := <-nodes:
Jeromy's avatar
Jeromy committed
406 407 408
			if !ok {
				return nil
			}
409 410 411 412 413 414 415

			if opt.Err != nil {
				return opt.Err
			}

			nd := opt.Node

Jeromy's avatar
Jeromy committed
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
			// a node has been fetched
			live--

			var keys []key.Key
			for _, lnk := range nd.Links {
				k := key.Key(lnk.Hash)
				if !set.Has(k) {
					set.Add(k)
					live++
					keys = append(keys, k)
				}
			}

			if live == 0 {
				return nil
			}

			if len(keys) > 0 {
				select {
				case toprocess <- keys:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

446 447 448 449 450 451 452 453
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *NodeOption) {
	var wg sync.WaitGroup
	defer func() {
		// wait for all 'get' calls to complete so we don't accidentally send
		// on a closed channel
		wg.Wait()
		close(out)
	}()
Jeromy's avatar
Jeromy committed
454

455
	get := func(ks []key.Key) {
456 457 458
		defer wg.Done()
		nodes := ds.GetMany(ctx, ks)
		for opt := range nodes {
Jeromy's avatar
Jeromy committed
459
			select {
460 461
			case out <- opt:
			case <-ctx.Done():
462
				return
Jeromy's avatar
Jeromy committed
463 464 465 466 467
			}
		}
	}

	for ks := range in {
468
		wg.Add(1)
469
		go get(ks)
Jeromy's avatar
Jeromy committed
470 471
	}
}