merkledag.go 8.16 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
Jeromy's avatar
Jeromy committed
6

7 8
	"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	blocks "github.com/ipfs/go-ipfs/blocks"
9
	key "github.com/ipfs/go-ipfs/blocks/key"
10
	bserv "github.com/ipfs/go-ipfs/blockservice"
Jeromy's avatar
Jeromy committed
11
	logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12 13
)

Jeromy's avatar
Jeromy committed
14
var log = logging.Logger("merkledag")
15
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
16

Jeromy's avatar
Jeromy committed
17 18
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
19
	Add(*Node) (key.Key, error)
Jeromy's avatar
Jeromy committed
20
	AddRecursive(*Node) error
21
	Get(context.Context, key.Key) (*Node, error)
Jeromy's avatar
Jeromy committed
22
	Remove(*Node) error
23 24 25

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
26
	GetDAG(context.Context, *Node) []NodeGetter
27
	GetNodes(context.Context, []key.Key) []NodeGetter
28 29

	Batch() *Batch
Jeromy's avatar
Jeromy committed
30 31 32 33 34 35
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

36
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
37 38
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
39 40
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
41
type dagService struct {
42
	Blocks *bserv.BlockService
43 44
}

45
// Add adds a node to the dagService, storing the block in the BlockService
46
func (n *dagService) Add(nd *Node) (key.Key, error) {
47
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
48
		return "", fmt.Errorf("dagService is nil")
49 50 51 52 53 54 55
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

56 57 58
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
59 60 61 62 63 64 65
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

66 67 68 69
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

Jeromy's avatar
Jeromy committed
70
// AddRecursive adds the given node and all child nodes to the BlockService
71
func (n *dagService) AddRecursive(nd *Node) error {
72 73
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
74
		log.Info("AddRecursive Error: %s\n", err)
75 76 77 78
		return err
	}

	for _, link := range nd.Links {
79 80 81 82 83
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
84 85 86 87 88 89
		}
	}

	return nil
}

90
// Get retrieves a node from the dagService, fetching the block in the BlockService
91
func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
92
	if n == nil {
93
		return nil, fmt.Errorf("dagService is nil")
94
	}
95 96
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
97

Jeromy's avatar
Jeromy committed
98
	b, err := n.Blocks.GetBlock(ctx, k)
99
	if err != nil {
100 101 102
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
103 104 105 106 107
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
108

Jeromy's avatar
Jeromy committed
109
// Remove deletes the given node and all of its children from the BlockService
110
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
111 112 113 114 115 116 117 118 119 120 121
	for _, l := range nd.Links {
		if l.Node != nil {
			n.Remove(l.Node)
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
122

123 124 125 126 127
// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
	toprocess := make(chan []key.Key, 8)
	nodes := make(chan *Node, 8)
	errs := make(chan error, 1)
Jeromy's avatar
Jeromy committed
128

129 130 131
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)
Jeromy's avatar
Jeromy committed
132

133 134 135 136 137 138 139 140 141 142
	go fetchNodes(ctx, serv, toprocess, nodes, errs)

	nodes <- root
	live := 1

	for {
		select {
		case nd, ok := <-nodes:
			if !ok {
				return nil
Jeromy's avatar
Jeromy committed
143 144
			}

145 146 147
			var keys []key.Key
			for _, lnk := range nd.Links {
				keys = append(keys, key.Key(lnk.Hash))
Jeromy's avatar
Jeromy committed
148
			}
149
			keys = dedupeKeys(keys)
Jeromy's avatar
Jeromy committed
150

151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
			// keep track of open request, when zero, we're done
			live += len(keys) - 1

			if live == 0 {
				return nil
			}

			if len(keys) > 0 {
				select {
				case toprocess <- keys:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case err := <-errs:
			return err
		case <-ctx.Done():
			return ctx.Err()
		}
Jeromy's avatar
Jeromy committed
170
	}
171
}
Jeromy's avatar
Jeromy committed
172

173 174 175 176 177 178 179 180
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
	defer close(out)
	for {
		select {
		case ks, ok := <-in:
			if !ok {
				return
			}
Jeromy's avatar
Jeromy committed
181

182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
			ng := ds.GetNodes(ctx, ks)
			for _, g := range ng {
				go func(g NodeGetter) {
					nd, err := g.Get(ctx)
					if err != nil {
						select {
						case errs <- err:
						case <-ctx.Done():
						}
						return
					}

					select {
					case out <- nd:
					case <-ctx.Done():
						return
					}
				}(g)
			}
		}
	}
Jeromy's avatar
Jeromy committed
203
}
204

Jeromy's avatar
Jeromy committed
205 206
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
207
func FindLinks(links []key.Key, k key.Key, start int) []int {
Jeromy's avatar
Jeromy committed
208
	var out []int
Jeromy's avatar
Jeromy committed
209 210
	for i, lnk_k := range links[start:] {
		if k == lnk_k {
Jeromy's avatar
Jeromy committed
211
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
212 213
		}
	}
Jeromy's avatar
Jeromy committed
214
	return out
Jeromy's avatar
Jeromy committed
215 216
}

217
// GetDAG will fill out all of the links of the given Node.
218 219
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
220
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
221
	var keys []key.Key
Jeromy's avatar
Jeromy committed
222
	for _, lnk := range root.Links {
223
		keys = append(keys, key.Key(lnk.Hash))
Jeromy's avatar
Jeromy committed
224 225 226 227 228
	}

	return ds.GetNodes(ctx, keys)
}

Jeromy's avatar
Jeromy committed
229 230
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
231
func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter {
232 233 234 235 236 237

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
238 239
	promises := make([]NodeGetter, len(keys))
	sendChans := make([]chan<- *Node, len(keys))
rht's avatar
rht committed
240
	for i := range keys {
Jeromy's avatar
Jeromy committed
241 242 243
		promises[i], sendChans[i] = newNodePromise(ctx)
	}

244
	dedupedKeys := dedupeKeys(keys)
245
	go func() {
246 247 248
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

249
		blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
250

251
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
252 253 254 255 256
			select {
			case blk, ok := <-blkchan:
				if !ok {
					return
				}
Jeromy's avatar
Jeromy committed
257

Jeromy's avatar
Jeromy committed
258 259
				nd, err := Decoded(blk.Data)
				if err != nil {
Jeromy's avatar
Jeromy committed
260
					// NB: can happen with improperly formatted input data
261
					log.Debug("Got back bad block!")
Jeromy's avatar
Jeromy committed
262
					return
Jeromy's avatar
Jeromy committed
263
				}
Jeromy's avatar
Jeromy committed
264
				is := FindLinks(keys, blk.Key(), 0)
Jeromy's avatar
Jeromy committed
265
				for _, i := range is {
266
					count++
267
					sendChans[i] <- nd
Jeromy's avatar
Jeromy committed
268 269 270
				}
			case <-ctx.Done():
				return
271 272 273
			}
		}
	}()
Jeromy's avatar
Jeromy committed
274 275 276
	return promises
}

277
// Remove duplicates from a list of keys
278 279 280
func dedupeKeys(ks []key.Key) []key.Key {
	kmap := make(map[key.Key]struct{})
	var out []key.Key
281 282 283 284 285 286 287 288 289
	for _, k := range ks {
		if _, ok := kmap[k]; !ok {
			kmap[k] = struct{}{}
			out = append(out, k)
		}
	}
	return out
}

Jeromy's avatar
Jeromy committed
290 291 292 293 294 295 296 297 298 299 300 301 302 303
func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
	ch := make(chan *Node, 1)
	return &nodePromise{
		recv: ch,
		ctx:  ctx,
	}, ch
}

type nodePromise struct {
	cache *Node
	recv  <-chan *Node
	ctx   context.Context
}

Jeromy's avatar
Jeromy committed
304 305 306 307
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
308
type NodeGetter interface {
309
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
310 311
}

312
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
Jeromy's avatar
Jeromy committed
313 314 315 316 317 318 319 320 321
	if np.cache != nil {
		return np.cache, nil
	}

	select {
	case blk := <-np.recv:
		np.cache = blk
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
322 323
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
324 325
	}
	return np.cache, nil
326
}
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364

type Batch struct {
	ds *dagService

	blocks  []*blocks.Block
	size    int
	MaxSize int
}

func (t *Batch) Add(nd *Node) (key.Key, error) {
	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
	if err != nil {
		return "", err
	}

	k := key.Key(b.Multihash)

	t.blocks = append(t.blocks, b)
	t.size += len(b.Data)
	if t.size > t.MaxSize {
		return k, t.Commit()
	}
	return k, nil
}

func (t *Batch) Commit() error {
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
	t.size = 0
	return err
}
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
	for _, lnk := range root.Links {
		k := key.Key(lnk.Hash)
		if !set.Has(k) {
			set.Add(k)
			child, err := ds.Get(ctx, k)
			if err != nil {
				return err
			}
			err = EnumerateChildren(ctx, ds, child, set)
			if err != nil {
				return err
			}
		}
	}
	return nil
}