merkledag.go 8.4 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
Jeromy's avatar
Jeromy committed
6

7
	blocks "github.com/ipfs/go-ipfs/blocks"
8
	key "github.com/ipfs/go-ipfs/blocks/key"
9
	bserv "github.com/ipfs/go-ipfs/blockservice"
Jeromy's avatar
Jeromy committed
10 11
	"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
	logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12 13
)

Jeromy's avatar
Jeromy committed
14
var log = logging.Logger("merkledag")
15
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
16

Jeromy's avatar
Jeromy committed
17 18
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
19
	Add(*Node) (key.Key, error)
Jeromy's avatar
Jeromy committed
20
	AddRecursive(*Node) error
21
	Get(context.Context, key.Key) (*Node, error)
Jeromy's avatar
Jeromy committed
22
	Remove(*Node) error
Jeromy's avatar
Jeromy committed
23
	RemoveRecursive(*Node) error
24 25 26

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
27
	GetDAG(context.Context, *Node) []NodeGetter
28
	GetNodes(context.Context, []key.Key) []NodeGetter
29 30

	Batch() *Batch
Jeromy's avatar
Jeromy committed
31 32 33 34 35 36
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

37
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
38 39
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
40 41
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
42
type dagService struct {
43
	Blocks *bserv.BlockService
44 45
}

46
// Add adds a node to the dagService, storing the block in the BlockService
47
func (n *dagService) Add(nd *Node) (key.Key, error) {
48
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
49
		return "", fmt.Errorf("dagService is nil")
50 51 52 53 54 55 56
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

57 58 59
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
60 61 62 63 64 65 66
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

67 68 69 70
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

Jeromy's avatar
Jeromy committed
71
// AddRecursive adds the given node and all child nodes to the BlockService
72
func (n *dagService) AddRecursive(nd *Node) error {
73 74
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
75
		log.Info("AddRecursive Error: %s\n", err)
76 77 78 79
		return err
	}

	for _, link := range nd.Links {
80 81 82 83 84
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
85 86 87 88 89 90
		}
	}

	return nil
}

91
// Get retrieves a node from the dagService, fetching the block in the BlockService
92
func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
93
	if n == nil {
94
		return nil, fmt.Errorf("dagService is nil")
95
	}
96 97
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
98

Jeromy's avatar
Jeromy committed
99
	b, err := n.Blocks.GetBlock(ctx, k)
100
	if err != nil {
101 102 103
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
104 105 106 107 108
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
109

Jeromy's avatar
Jeromy committed
110
// Remove deletes the given node and all of its children from the BlockService
Jeromy's avatar
Jeromy committed
111
func (n *dagService) RemoveRecursive(nd *Node) error {
Jeromy's avatar
Jeromy committed
112 113
	for _, l := range nd.Links {
		if l.Node != nil {
Jeromy's avatar
Jeromy committed
114
			n.RemoveRecursive(l.Node)
Jeromy's avatar
Jeromy committed
115 116 117 118 119 120 121 122
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
123

Jeromy's avatar
Jeromy committed
124 125 126 127 128 129 130 131
func (n *dagService) Remove(nd *Node) error {
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}

132 133
// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
Jeromy's avatar
Jeromy committed
134
	return EnumerateChildrenAsync(ctx, serv, root, key.NewKeySet())
Jeromy's avatar
Jeromy committed
135
}
136

Jeromy's avatar
Jeromy committed
137 138
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
139
func FindLinks(links []key.Key, k key.Key, start int) []int {
Jeromy's avatar
Jeromy committed
140
	var out []int
Jeromy's avatar
Jeromy committed
141 142
	for i, lnk_k := range links[start:] {
		if k == lnk_k {
Jeromy's avatar
Jeromy committed
143
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
144 145
		}
	}
Jeromy's avatar
Jeromy committed
146
	return out
Jeromy's avatar
Jeromy committed
147 148
}

149
// GetDAG will fill out all of the links of the given Node.
150 151
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
152
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
153
	var keys []key.Key
Jeromy's avatar
Jeromy committed
154
	for _, lnk := range root.Links {
155
		keys = append(keys, key.Key(lnk.Hash))
Jeromy's avatar
Jeromy committed
156 157 158 159 160
	}

	return ds.GetNodes(ctx, keys)
}

Jeromy's avatar
Jeromy committed
161 162
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
163
func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter {
164 165 166 167 168 169

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
170 171
	promises := make([]NodeGetter, len(keys))
	sendChans := make([]chan<- *Node, len(keys))
rht's avatar
rht committed
172
	for i := range keys {
Jeromy's avatar
Jeromy committed
173 174 175
		promises[i], sendChans[i] = newNodePromise(ctx)
	}

176
	dedupedKeys := dedupeKeys(keys)
177
	go func() {
178 179 180
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

181
		blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
182

183
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
184 185 186 187 188
			select {
			case blk, ok := <-blkchan:
				if !ok {
					return
				}
Jeromy's avatar
Jeromy committed
189

Jeromy's avatar
Jeromy committed
190 191
				nd, err := Decoded(blk.Data)
				if err != nil {
Jeromy's avatar
Jeromy committed
192
					// NB: can happen with improperly formatted input data
193
					log.Debug("Got back bad block!")
Jeromy's avatar
Jeromy committed
194
					return
Jeromy's avatar
Jeromy committed
195
				}
Jeromy's avatar
Jeromy committed
196
				is := FindLinks(keys, blk.Key(), 0)
Jeromy's avatar
Jeromy committed
197
				for _, i := range is {
198
					count++
199
					sendChans[i] <- nd
Jeromy's avatar
Jeromy committed
200 201 202
				}
			case <-ctx.Done():
				return
203 204 205
			}
		}
	}()
Jeromy's avatar
Jeromy committed
206 207 208
	return promises
}

209
// Remove duplicates from a list of keys
210 211 212
func dedupeKeys(ks []key.Key) []key.Key {
	kmap := make(map[key.Key]struct{})
	var out []key.Key
213 214 215 216 217 218 219 220 221
	for _, k := range ks {
		if _, ok := kmap[k]; !ok {
			kmap[k] = struct{}{}
			out = append(out, k)
		}
	}
	return out
}

Jeromy's avatar
Jeromy committed
222 223 224 225 226 227 228 229 230 231 232 233 234 235
func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
	ch := make(chan *Node, 1)
	return &nodePromise{
		recv: ch,
		ctx:  ctx,
	}, ch
}

type nodePromise struct {
	cache *Node
	recv  <-chan *Node
	ctx   context.Context
}

Jeromy's avatar
Jeromy committed
236 237 238 239
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
240
type NodeGetter interface {
241
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
242 243
}

244
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
Jeromy's avatar
Jeromy committed
245 246 247 248 249 250 251 252 253
	if np.cache != nil {
		return np.cache, nil
	}

	select {
	case blk := <-np.recv:
		np.cache = blk
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
254 255
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
256 257
	}
	return np.cache, nil
258
}
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296

type Batch struct {
	ds *dagService

	blocks  []*blocks.Block
	size    int
	MaxSize int
}

func (t *Batch) Add(nd *Node) (key.Key, error) {
	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
	if err != nil {
		return "", err
	}

	k := key.Key(b.Multihash)

	t.blocks = append(t.blocks, b)
	t.size += len(b.Data)
	if t.size > t.MaxSize {
		return k, t.Commit()
	}
	return k, nil
}

func (t *Batch) Commit() error {
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
	t.size = 0
	return err
}
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
	for _, lnk := range root.Links {
		k := key.Key(lnk.Hash)
		if !set.Has(k) {
			set.Add(k)
			child, err := ds.Get(ctx, k)
			if err != nil {
				return err
			}
			err = EnumerateChildren(ctx, ds, child, set)
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
	toprocess := make(chan []key.Key, 8)
	nodes := make(chan *Node, 8)
	errs := make(chan error, 1)

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

	go fetchNodes(ctx, ds, toprocess, nodes, errs)

	nodes <- root
	live := 1

	for {
		select {
		case nd, ok := <-nodes:
			if !ok {
				return nil
			}
			// a node has been fetched
			live--

			var keys []key.Key
			for _, lnk := range nd.Links {
				k := key.Key(lnk.Hash)
				if !set.Has(k) {
					set.Add(k)
					live++
					keys = append(keys, k)
				}
			}

			if live == 0 {
				return nil
			}

			if len(keys) > 0 {
				select {
				case toprocess <- keys:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case err := <-errs:
			return err
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
	defer close(out)

	get := func(g NodeGetter) {
		nd, err := g.Get(ctx)
		if err != nil {
			select {
			case errs <- err:
			case <-ctx.Done():
			}
			return
		}

		select {
		case out <- nd:
		case <-ctx.Done():
			return
		}
	}

	for ks := range in {
		ng := ds.GetNodes(ctx, ks)
		for _, g := range ng {
			go get(g)
		}
	}
}