merkledag.go 8.5 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
6
	"sync"
Jeromy's avatar
Jeromy committed
7

8
	blocks "github.com/ipfs/go-ipfs/blocks"
9
	key "github.com/ipfs/go-ipfs/blocks/key"
10
	bserv "github.com/ipfs/go-ipfs/blockservice"
Jeromy's avatar
Jeromy committed
11 12
	"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
	logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14
)

Jeromy's avatar
Jeromy committed
15
var log = logging.Logger("merkledag")
16
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
17

Jeromy's avatar
Jeromy committed
18 19
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
20 21
	Add(*Node) (key.Key, error)
	Get(context.Context, key.Key) (*Node, error)
Jeromy's avatar
Jeromy committed
22
	Remove(*Node) error
23 24 25

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
26
	GetMany(context.Context, []key.Key) <-chan *NodeOption
27 28

	Batch() *Batch
Jeromy's avatar
Jeromy committed
29 30 31 32 33 34
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

35
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
36 37
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
38 39
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
40
type dagService struct {
41
	Blocks *bserv.BlockService
42 43
}

44
// Add adds a node to the dagService, storing the block in the BlockService
45
func (n *dagService) Add(nd *Node) (key.Key, error) {
46
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
47
		return "", fmt.Errorf("dagService is nil")
48 49
	}

50
	d, err := nd.EncodeProtobuf(false)
51 52 53 54
	if err != nil {
		return "", err
	}

55 56 57
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
58 59 60 61 62 63 64
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

65 66 67 68
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

69
// Get retrieves a node from the dagService, fetching the block in the BlockService
70
func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
71
	if n == nil {
72
		return nil, fmt.Errorf("dagService is nil")
73
	}
74 75
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
76

Jeromy's avatar
Jeromy committed
77
	b, err := n.Blocks.GetBlock(ctx, k)
78
	if err != nil {
79 80 81
		if err == bserv.ErrNotFound {
			return nil, ErrNotFound
		}
82 83 84
		return nil, err
	}

85
	return DecodeProtobuf(b.Data)
86
}
Jeromy's avatar
Jeromy committed
87

Jeromy's avatar
Jeromy committed
88 89 90 91 92 93 94 95
func (n *dagService) Remove(nd *Node) error {
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}

96 97
// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
Jeromy's avatar
Jeromy committed
98
	return EnumerateChildrenAsync(ctx, serv, root, key.NewKeySet())
Jeromy's avatar
Jeromy committed
99
}
100

Jeromy's avatar
Jeromy committed
101 102
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
103
func FindLinks(links []key.Key, k key.Key, start int) []int {
Jeromy's avatar
Jeromy committed
104
	var out []int
Jeromy's avatar
Jeromy committed
105 106
	for i, lnk_k := range links[start:] {
		if k == lnk_k {
Jeromy's avatar
Jeromy committed
107
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
108 109
		}
	}
Jeromy's avatar
Jeromy committed
110
	return out
Jeromy's avatar
Jeromy committed
111 112
}

113 114 115 116 117 118 119
type NodeOption struct {
	Node *Node
	Err  error
}

func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeOption {
	out := make(chan *NodeOption, len(keys))
120
	blocks := ds.Blocks.GetBlocks(ctx, keys)
121 122
	var count int

123 124 125 126 127 128
	go func() {
		defer close(out)
		for {
			select {
			case b, ok := <-blocks:
				if !ok {
129
					if count != len(keys) {
130
						out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
131
					}
132 133
					return
				}
134
				nd, err := DecodeProtobuf(b.Data)
135
				if err != nil {
136
					out <- &NodeOption{Err: err}
137 138
					return
				}
Jeromy's avatar
Jeromy committed
139 140

				// buffered, no need to select
141
				out <- &NodeOption{Node: nd}
Jeromy's avatar
Jeromy committed
142 143
				count++

144
			case <-ctx.Done():
145
				out <- &NodeOption{Err: ctx.Err()}
146 147 148 149
				return
			}
		}
	}()
150
	return out
151 152
}

153
// GetDAG will fill out all of the links of the given Node.
154 155
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
156
func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter {
157
	var keys []key.Key
Jeromy's avatar
Jeromy committed
158
	for _, lnk := range root.Links {
159
		keys = append(keys, key.Key(lnk.Hash))
Jeromy's avatar
Jeromy committed
160 161
	}

162
	return GetNodes(ctx, ds, keys)
Jeromy's avatar
Jeromy committed
163 164
}

Jeromy's avatar
Jeromy committed
165 166
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
167
func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
168 169 170 171 172 173

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
174 175
	promises := make([]NodeGetter, len(keys))
	sendChans := make([]chan<- *Node, len(keys))
rht's avatar
rht committed
176
	for i := range keys {
Jeromy's avatar
Jeromy committed
177 178 179
		promises[i], sendChans[i] = newNodePromise(ctx)
	}

180
	dedupedKeys := dedupeKeys(keys)
181
	go func() {
182 183 184
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

185
		nodechan := ds.GetMany(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
186

187
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
188
			select {
189
			case opt, ok := <-nodechan:
Jeromy's avatar
Jeromy committed
190 191 192
				if !ok {
					return
				}
Jeromy's avatar
Jeromy committed
193

194 195 196 197 198 199 200
				if opt.Err != nil {
					log.Error("error fetching: ", opt.Err)
					return
				}

				nd := opt.Node

201
				k, err := nd.Key()
Jeromy's avatar
Jeromy committed
202
				if err != nil {
203 204
					log.Error("Failed to get node key: ", err)
					continue
Jeromy's avatar
Jeromy committed
205
				}
206 207

				is := FindLinks(keys, k, 0)
Jeromy's avatar
Jeromy committed
208
				for _, i := range is {
209
					count++
210
					sendChans[i] <- nd
Jeromy's avatar
Jeromy committed
211 212 213
				}
			case <-ctx.Done():
				return
214 215 216
			}
		}
	}()
Jeromy's avatar
Jeromy committed
217 218 219
	return promises
}

220
// Remove duplicates from a list of keys
221 222 223
func dedupeKeys(ks []key.Key) []key.Key {
	kmap := make(map[key.Key]struct{})
	var out []key.Key
224 225 226 227 228 229 230 231 232
	for _, k := range ks {
		if _, ok := kmap[k]; !ok {
			kmap[k] = struct{}{}
			out = append(out, k)
		}
	}
	return out
}

Jeromy's avatar
Jeromy committed
233 234 235 236 237 238 239 240 241 242 243 244 245 246
func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
	ch := make(chan *Node, 1)
	return &nodePromise{
		recv: ch,
		ctx:  ctx,
	}, ch
}

type nodePromise struct {
	cache *Node
	recv  <-chan *Node
	ctx   context.Context
}

Jeromy's avatar
Jeromy committed
247 248 249 250
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
251
type NodeGetter interface {
252
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
253 254
}

255
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
Jeromy's avatar
Jeromy committed
256 257 258 259 260 261 262 263 264
	if np.cache != nil {
		return np.cache, nil
	}

	select {
	case blk := <-np.recv:
		np.cache = blk
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
265 266
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
267 268
	}
	return np.cache, nil
269
}
270 271 272 273 274 275 276 277 278 279

type Batch struct {
	ds *dagService

	blocks  []*blocks.Block
	size    int
	MaxSize int
}

func (t *Batch) Add(nd *Node) (key.Key, error) {
280
	d, err := nd.EncodeProtobuf(false)
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
	if err != nil {
		return "", err
	}

	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
	if err != nil {
		return "", err
	}

	k := key.Key(b.Multihash)

	t.blocks = append(t.blocks, b)
	t.size += len(b.Data)
	if t.size > t.MaxSize {
		return k, t.Commit()
	}
	return k, nil
}

func (t *Batch) Commit() error {
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
	t.size = 0
	return err
}
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
	for _, lnk := range root.Links {
		k := key.Key(lnk.Hash)
		if !set.Has(k) {
			set.Add(k)
			child, err := ds.Get(ctx, k)
			if err != nil {
				return err
			}
			err = EnumerateChildren(ctx, ds, child, set)
			if err != nil {
				return err
			}
		}
	}
	return nil
}
Jeromy's avatar
Jeromy committed
329 330 331

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
	toprocess := make(chan []key.Key, 8)
332
	nodes := make(chan *NodeOption, 8)
Jeromy's avatar
Jeromy committed
333 334 335 336 337

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	defer close(toprocess)

338
	go fetchNodes(ctx, ds, toprocess, nodes)
Jeromy's avatar
Jeromy committed
339

340
	nodes <- &NodeOption{Node: root}
Jeromy's avatar
Jeromy committed
341 342 343 344
	live := 1

	for {
		select {
345
		case opt, ok := <-nodes:
Jeromy's avatar
Jeromy committed
346 347 348
			if !ok {
				return nil
			}
349 350 351 352 353 354 355

			if opt.Err != nil {
				return opt.Err
			}

			nd := opt.Node

Jeromy's avatar
Jeromy committed
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
			// a node has been fetched
			live--

			var keys []key.Key
			for _, lnk := range nd.Links {
				k := key.Key(lnk.Hash)
				if !set.Has(k) {
					set.Add(k)
					live++
					keys = append(keys, k)
				}
			}

			if live == 0 {
				return nil
			}

			if len(keys) > 0 {
				select {
				case toprocess <- keys:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

386 387 388 389 390 391 392 393
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *NodeOption) {
	var wg sync.WaitGroup
	defer func() {
		// wait for all 'get' calls to complete so we don't accidentally send
		// on a closed channel
		wg.Wait()
		close(out)
	}()
Jeromy's avatar
Jeromy committed
394

395
	get := func(ks []key.Key) {
396 397 398
		defer wg.Done()
		nodes := ds.GetMany(ctx, ks)
		for opt := range nodes {
Jeromy's avatar
Jeromy committed
399
			select {
400 401
			case out <- opt:
			case <-ctx.Done():
402
				return
Jeromy's avatar
Jeromy committed
403 404 405 406 407
			}
		}
	}

	for ks := range in {
408
		wg.Add(1)
409
		go get(ks)
Jeromy's avatar
Jeromy committed
410 411
	}
}