add.go 12.6 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1
package coreunix
2 3

import (
4
	"context"
5
	"fmt"
6
	"io"
7
	"io/ioutil"
8
	"os"
9
	gopath "path"
camelmasa's avatar
camelmasa committed
10
	"path/filepath"
11
	"strconv"
Jeromy's avatar
Jeromy committed
12

Jeromy's avatar
Jeromy committed
13
	core "github.com/ipfs/go-ipfs/core"
Jeromy's avatar
Jeromy committed
14
	mfs "github.com/ipfs/go-ipfs/mfs"
15
	"github.com/ipfs/go-ipfs/pin"
Steven Allen's avatar
Steven Allen committed
16 17 18 19 20
	unixfs "gx/ipfs/QmVxjT67BU1QZUPzSLNZT6DkDzVNfPfkzqNyJYFXxSH2hA/go-unixfs"
	balanced "gx/ipfs/QmVxjT67BU1QZUPzSLNZT6DkDzVNfPfkzqNyJYFXxSH2hA/go-unixfs/importer/balanced"
	ihelper "gx/ipfs/QmVxjT67BU1QZUPzSLNZT6DkDzVNfPfkzqNyJYFXxSH2hA/go-unixfs/importer/helpers"
	trickle "gx/ipfs/QmVxjT67BU1QZUPzSLNZT6DkDzVNfPfkzqNyJYFXxSH2hA/go-unixfs/importer/trickle"
	dag "gx/ipfs/QmfKKGzisaoP4oiHQSHz1zLbXDCTeXe7NVfX1FAMKzcHmt/go-merkledag"
Jeromy's avatar
Jeromy committed
21

Steven Allen's avatar
Steven Allen committed
22
	posinfo "gx/ipfs/QmSHjPDw8yNgLZ7cBfX7w3Smn7PHwYhNEpd4LHQQxUg35L/go-ipfs-posinfo"
Steven Allen's avatar
Steven Allen committed
23
	bstore "gx/ipfs/QmTCHqj6s51pDu1GaPGyBW2VdmCUvtzLCF6nWykfX9ZYRt/go-ipfs-blockstore"
Steven Allen's avatar
Steven Allen committed
24 25 26
	chunker "gx/ipfs/QmVDjhUMtkRskBFAVNwyXuLSKbeAya7JKPnzAxMKDaK4x4/go-ipfs-chunker"
	cid "gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid"
	ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format"
Steven Allen's avatar
Steven Allen committed
27
	logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log"
28
	files "gx/ipfs/QmdE4gMduCKCGAcczM2F5ioYDfdeKuPix138wrES1YSr7f/go-ipfs-cmdkit/files"
29 30
)

Jeromy's avatar
Jeromy committed
31
var log = logging.Logger("coreunix")
32

33 34 35
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256

36 37
var liveCacheSize = uint64(256 << 10)

38 39 40 41 42 43 44 45
type Link struct {
	Name, Hash string
	Size       uint64
}

type Object struct {
	Hash  string
	Links []Link
46
	Size  string
47 48 49 50 51 52
}

type AddedObject struct {
	Name  string
	Hash  string `json:",omitempty"`
	Bytes int64  `json:",omitempty"`
53
	Size  string `json:",omitempty"`
54 55
}

56
// NewAdder Returns a new Adder used for a file add operation.
57
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds ipld.DAGService) (*Adder, error) {
58
	return &Adder{
59 60 61 62 63 64 65 66 67 68
		ctx:        ctx,
		pinning:    p,
		blockstore: bs,
		dagService: ds,
		Progress:   false,
		Hidden:     true,
		Pin:        true,
		Trickle:    false,
		Wrap:       false,
		Chunker:    "",
Jeromy's avatar
Jeromy committed
69
	}, nil
70 71
}

72
// Adder holds the switches passed to the `add` command.
73
type Adder struct {
74 75 76
	ctx        context.Context
	pinning    pin.Pinner
	blockstore bstore.GCBlockstore
77
	dagService ipld.DAGService
78 79 80 81 82
	Out        chan interface{}
	Progress   bool
	Hidden     bool
	Pin        bool
	Trickle    bool
83
	RawLeaves  bool
84 85
	Silent     bool
	Wrap       bool
86
	NoCopy     bool
87
	Chunker    string
88
	root       ipld.Node
89
	mroot      *mfs.Root
90
	unlocker   bstore.Unlocker
Jeromy's avatar
Jeromy committed
91
	tempRoot   *cid.Cid
92
	Prefix     *cid.Prefix
93
	liveNodes  uint64
94 95 96 97 98 99 100 101 102 103 104 105 106 107
}

func (adder *Adder) mfsRoot() (*mfs.Root, error) {
	if adder.mroot != nil {
		return adder.mroot, nil
	}
	rnode := unixfs.EmptyDirNode()
	rnode.SetPrefix(adder.Prefix)
	mr, err := mfs.NewRoot(adder.ctx, adder.dagService, rnode, nil)
	if err != nil {
		return nil, err
	}
	adder.mroot = mr
	return adder.mroot, nil
108 109
}

110
// SetMfsRoot sets `r` as the root for Adder.
Jeromy's avatar
Jeromy committed
111
func (adder *Adder) SetMfsRoot(r *mfs.Root) {
112
	adder.mroot = r
Jeromy's avatar
Jeromy committed
113 114
}

115
// Constructs a node from reader's data, and adds it. Doesn't pin.
116
func (adder *Adder) add(reader io.Reader) (ipld.Node, error) {
117
	chnk, err := chunker.FromString(reader, adder.Chunker)
118 119 120
	if err != nil {
		return nil, err
	}
121

122 123 124 125
	params := ihelper.DagBuilderParams{
		Dagserv:   adder.dagService,
		RawLeaves: adder.RawLeaves,
		Maxlinks:  ihelper.DefaultLinksPerBlock,
126
		NoCopy:    adder.NoCopy,
127
		Prefix:    adder.Prefix,
128
	}
129

Jeromy's avatar
Jeromy committed
130
	if adder.Trickle {
131
		return trickle.Layout(params.New(chnk))
132 133
	}

134
	return balanced.Layout(params.New(chnk))
135 136
}

137
// RootNode returns the root node of the Added.
138
func (adder *Adder) RootNode() (ipld.Node, error) {
Jeromy's avatar
Jeromy committed
139
	// for memoizing
Jeromy's avatar
Jeromy committed
140 141
	if adder.root != nil {
		return adder.root, nil
Jeromy's avatar
Jeromy committed
142
	}
143

144 145 146 147
	mr, err := adder.mfsRoot()
	if err != nil {
		return nil, err
	}
148
	root, err := mr.GetDirectory().GetNode()
Jeromy's avatar
Jeromy committed
149 150 151
	if err != nil {
		return nil, err
	}
152

Jeromy's avatar
Jeromy committed
153
	// if not wrapping, AND one root file, use that hash as root.
154 155
	if !adder.Wrap && len(root.Links()) == 1 {
		nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService)
Jeromy's avatar
Jeromy committed
156 157
		if err != nil {
			return nil, err
Jeromy's avatar
Jeromy committed
158
		}
159

Jeromy's avatar
Jeromy committed
160
		root = nd
Jeromy's avatar
Jeromy committed
161
	}
Jeromy's avatar
Jeromy committed
162

Jeromy's avatar
Jeromy committed
163
	adder.root = root
Jeromy's avatar
Jeromy committed
164
	return root, err
165 166
}

167 168
// Recursively pins the root node of Adder and
// writes the pin state to the backing datastore.
Jeromy's avatar
Jeromy committed
169 170
func (adder *Adder) PinRoot() error {
	root, err := adder.RootNode()
171 172 173
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
174
	if !adder.Pin {
175 176
		return nil
	}
177

178 179 180
	rnk := root.Cid()

	err = adder.dagService.Add(adder.ctx, root)
181 182 183 184
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
185
	if adder.tempRoot != nil {
186
		err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true)
Jeromy's avatar
Jeromy committed
187 188 189
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
190
		adder.tempRoot = rnk
Jeromy's avatar
Jeromy committed
191 192
	}

193 194
	adder.pinning.PinWithMode(rnk, pin.Recursive)
	return adder.pinning.Flush()
195 196
}

197
// Finalize flushes the mfs root directory and returns the mfs root node.
198
func (adder *Adder) Finalize() (ipld.Node, error) {
199 200 201 202
	mr, err := adder.mfsRoot()
	if err != nil {
		return nil, err
	}
203 204
	var root mfs.FSNode
	root = mr.GetDirectory()
Stephen Whitmore's avatar
Stephen Whitmore committed
205

206
	err = root.Flush()
Jeromy's avatar
Jeromy committed
207 208 209 210 211
	if err != nil {
		return nil, err
	}

	var name string
Jeromy's avatar
Jeromy committed
212
	if !adder.Wrap {
Jeromy's avatar
Jeromy committed
213
		children, err := root.(*mfs.Directory).ListNames(adder.ctx)
Jeromy's avatar
Jeromy committed
214 215 216
		if err != nil {
			return nil, err
		}
Quantomic's avatar
Quantomic committed
217 218 219 220 221

		if len(children) == 0 {
			return nil, fmt.Errorf("expected at least one child dir, got none")
		}

Jeromy's avatar
Jeromy committed
222
		name = children[0]
Stephen Whitmore's avatar
Stephen Whitmore committed
223

224 225 226 227 228
		mr, err := adder.mfsRoot()
		if err != nil {
			return nil, err
		}

229
		dir := mr.GetDirectory()
230 231

		root, err = dir.Child(name)
Jeromy's avatar
Jeromy committed
232 233 234 235 236
		if err != nil {
			return nil, err
		}
	}

Jeromy's avatar
Jeromy committed
237
	err = adder.outputDirs(name, root)
Jeromy's avatar
Jeromy committed
238 239 240 241
	if err != nil {
		return nil, err
	}

242
	err = mr.Close()
243 244
	if err != nil {
		return nil, err
Stephen Whitmore's avatar
Stephen Whitmore committed
245 246
	}

Stephen Whitmore's avatar
Stephen Whitmore committed
247
	return root.GetNode()
Jeromy's avatar
Jeromy committed
248 249
}

Jeromy's avatar
Jeromy committed
250 251 252
func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
	switch fsn := fsn.(type) {
	case *mfs.File:
Jeromy's avatar
Jeromy committed
253
		return nil
Jeromy's avatar
Jeromy committed
254
	case *mfs.Directory:
Jeromy's avatar
Jeromy committed
255
		names, err := fsn.ListNames(adder.ctx)
256 257 258 259 260
		if err != nil {
			return err
		}

		for _, name := range names {
Jeromy's avatar
Jeromy committed
261 262 263 264 265 266 267 268 269 270
			child, err := fsn.Child(name)
			if err != nil {
				return err
			}

			childpath := gopath.Join(path, name)
			err = adder.outputDirs(childpath, child)
			if err != nil {
				return err
			}
271 272

			fsn.Uncache(name)
Jeromy's avatar
Jeromy committed
273
		}
Jeromy's avatar
Jeromy committed
274
		nd, err := fsn.GetNode()
Jeromy's avatar
Jeromy committed
275 276
		if err != nil {
			return err
Jeromy's avatar
Jeromy committed
277
		}
Jeromy's avatar
Jeromy committed
278

Jeromy's avatar
Jeromy committed
279 280 281 282
		return outputDagnode(adder.Out, path, nd)
	default:
		return fmt.Errorf("unrecognized fsn type: %#v", fsn)
	}
283 284
}

285 286 287
// Add builds a merkledag node from a reader, adds it to the blockstore,
// and returns the key representing that node.
// If you want to pin it, use NewAdder() and Adder.PinRoot().
288
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
Lars Gierth's avatar
Lars Gierth committed
289 290 291
	return AddWithContext(n.Context(), n, r)
}

292
// AddWithContext does the same as Add, but with a custom context.
Lars Gierth's avatar
Lars Gierth committed
293
func AddWithContext(ctx context.Context, n *core.IpfsNode, r io.Reader) (string, error) {
294
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
295

296
	fileAdder, err := NewAdder(ctx, n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
297 298 299
	if err != nil {
		return "", err
	}
300

301
	node, err := fileAdder.add(r)
302 303 304
	if err != nil {
		return "", err
	}
305

Jeromy's avatar
Jeromy committed
306
	return node.Cid().String(), nil
307
}
308 309 310

// AddR recursively adds files in |path|.
func AddR(n *core.IpfsNode, root string) (key string, err error) {
311
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
312

313
	stat, err := os.Lstat(root)
314 315 316
	if err != nil {
		return "", err
	}
317

camelmasa's avatar
camelmasa committed
318
	f, err := files.NewSerialFile(filepath.Base(root), root, false, stat)
319 320 321
	if err != nil {
		return "", err
	}
322
	defer f.Close()
323

324
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
325 326 327
	if err != nil {
		return "", err
	}
328

Jeromy's avatar
Jeromy committed
329
	err = fileAdder.addFile(f)
330 331 332
	if err != nil {
		return "", err
	}
333

334
	nd, err := fileAdder.Finalize()
Jeromy's avatar
Jeromy committed
335 336 337 338
	if err != nil {
		return "", err
	}

Jeromy's avatar
Jeromy committed
339
	return nd.String(), nil
340 341
}

342 343
// AddWrapped adds data from a reader, and wraps it with a directory object
// to preserve the filename.
344 345
// Returns the path of the added file ("<dir hash>/filename"), the DAG node of
// the directory, and and error if any.
346
func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, ipld.Node, error) {
347
	file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil)
348
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
349 350 351
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
352
	fileAdder.Wrap = true
Jeromy's avatar
Jeromy committed
353

354
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
355

Jeromy's avatar
Jeromy committed
356
	err = fileAdder.addFile(file)
Jeromy's avatar
Jeromy committed
357 358 359 360
	if err != nil {
		return "", nil, err
	}

361
	dagnode, err := fileAdder.Finalize()
362 363 364
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
365

Jeromy's avatar
Jeromy committed
366 367
	c := dagnode.Cid()
	return gopath.Join(c.String(), filename), dagnode, nil
368 369
}

370
func (adder *Adder) addNode(node ipld.Node, path string) error {
371 372
	// patch it into the root
	if path == "" {
Jeromy's avatar
Jeromy committed
373
		path = node.Cid().String()
374
	}
375

376 377 378 379
	if pi, ok := node.(*posinfo.FilestoreNode); ok {
		node = pi.Node
	}

380 381 382 383
	mr, err := adder.mfsRoot()
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
384 385
	dir := gopath.Dir(path)
	if dir != "." {
386 387 388 389 390 391
		opts := mfs.MkdirOpts{
			Mkparents: true,
			Flush:     false,
			Prefix:    adder.Prefix,
		}
		if err := mfs.Mkdir(mr, dir, opts); err != nil {
Jeromy's avatar
Jeromy committed
392 393 394 395
			return err
		}
	}

396
	if err := mfs.PutNode(mr, path, node); err != nil {
397 398
		return err
	}
399

Jeromy's avatar
Jeromy committed
400
	if !adder.Silent {
401
		return outputDagnode(adder.Out, path, node)
Jeromy's avatar
Jeromy committed
402 403
	}
	return nil
404 405
}

406
// AddFile adds the given file while respecting the adder.
Jeromy's avatar
Jeromy committed
407
func (adder *Adder) AddFile(file files.File) error {
408 409 410
	if adder.Pin {
		adder.unlocker = adder.blockstore.PinLock()
	}
411
	defer func() {
412 413 414
		if adder.unlocker != nil {
			adder.unlocker.Unlock()
		}
415
	}()
Jeromy's avatar
Jeromy committed
416

Jeromy's avatar
Jeromy committed
417
	return adder.addFile(file)
Jeromy's avatar
Jeromy committed
418 419 420 421 422 423 424 425
}

func (adder *Adder) addFile(file files.File) error {
	err := adder.maybePauseForGC()
	if err != nil {
		return err
	}

426 427 428 429 430 431
	if adder.liveNodes >= liveCacheSize {
		// TODO: A smarter cache that uses some sort of lru cache with an eviction handler
		mr, err := adder.mfsRoot()
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
432
		if err := mr.FlushMemFree(adder.ctx); err != nil {
433 434
			return err
		}
Jeromy's avatar
Jeromy committed
435

436 437 438 439
		adder.liveNodes = 0
	}
	adder.liveNodes++

440
	if file.IsDirectory() {
Jeromy's avatar
Jeromy committed
441
		return adder.addDir(file)
442 443
	}

444 445 446 447
	// case for symlink
	if s, ok := file.(*files.Symlink); ok {
		sdata, err := unixfs.SymlinkData(s.Target)
		if err != nil {
Jeromy's avatar
Jeromy committed
448
			return err
449
		}
450

451
		dagnode := dag.NodeWithData(sdata)
452
		dagnode.SetPrefix(adder.Prefix)
453
		err = adder.dagService.Add(adder.ctx, dagnode)
454
		if err != nil {
Jeromy's avatar
Jeromy committed
455
			return err
456 457
		}

Jeromy's avatar
Jeromy committed
458
		return adder.addNode(dagnode, s.FileName())
459 460 461 462 463 464
	}

	// case for regular file
	// if the progress flag was specified, wrap the file so that we can send
	// progress updates to the client (over the output channel)
	var reader io.Reader = file
Jeromy's avatar
Jeromy committed
465
	if adder.Progress {
466 467 468 469 470 471
		rdr := &progressReader{file: file, out: adder.Out}
		if fi, ok := file.(files.FileInfo); ok {
			reader = &progressReader2{rdr, fi}
		} else {
			reader = rdr
		}
472 473
	}

Jeromy's avatar
Jeromy committed
474
	dagnode, err := adder.add(reader)
475
	if err != nil {
Jeromy's avatar
Jeromy committed
476
		return err
477 478 479
	}

	// patch it into the root
Jeromy's avatar
Jeromy committed
480
	return adder.addNode(dagnode, file.FileName())
481 482
}

Jeromy's avatar
Jeromy committed
483
func (adder *Adder) addDir(dir files.File) error {
484
	log.Infof("adding directory: %s", dir.FileName())
485

486 487 488 489
	mr, err := adder.mfsRoot()
	if err != nil {
		return err
	}
490 491 492 493 494
	err = mfs.Mkdir(mr, dir.FileName(), mfs.MkdirOpts{
		Mkparents: true,
		Flush:     false,
		Prefix:    adder.Prefix,
	})
Jeromy's avatar
Jeromy committed
495 496 497 498
	if err != nil {
		return err
	}

499 500
	for {
		file, err := dir.NextFile()
501
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
502
			return err
503 504 505
		}
		if file == nil {
			break
506 507
		}

508 509 510
		// Skip hidden files when adding recursively, unless Hidden is enabled.
		if files.IsHidden(file) && !adder.Hidden {
			log.Infof("%s is hidden, skipping", file.FileName())
511
			continue
512 513 514
		}
		err = adder.addFile(file)
		if err != nil {
Jeromy's avatar
Jeromy committed
515
			return err
516 517 518
		}
	}

Jeromy's avatar
Jeromy committed
519
	return nil
520
}
521

Jeromy's avatar
Jeromy committed
522
func (adder *Adder) maybePauseForGC() error {
523
	if adder.unlocker != nil && adder.blockstore.GCRequested() {
Jeromy's avatar
Jeromy committed
524 525 526 527 528
		err := adder.PinRoot()
		if err != nil {
			return err
		}

529
		adder.unlocker.Unlock()
530
		adder.unlocker = adder.blockstore.PinLock()
Jeromy's avatar
Jeromy committed
531 532 533 534
	}
	return nil
}

535
// outputDagnode sends dagnode info over the output channel
536
func outputDagnode(out chan interface{}, name string, dn ipld.Node) error {
537 538 539 540 541 542 543 544 545 546 547 548
	if out == nil {
		return nil
	}

	o, err := getOutput(dn)
	if err != nil {
		return err
	}

	out <- &AddedObject{
		Hash: o.Hash,
		Name: name,
549
		Size: o.Size,
550 551 552 553 554 555
	}

	return nil
}

// from core/commands/object.go
556
func getOutput(dagnode ipld.Node) (*Object, error) {
Jeromy's avatar
Jeromy committed
557
	c := dagnode.Cid()
558 559 560 561
	s, err := dagnode.Size()
	if err != nil {
		return nil, err
	}
562 563

	output := &Object{
Jeromy's avatar
Jeromy committed
564
		Hash:  c.String(),
565
		Size:  strconv.FormatUint(s, 10),
566
		Links: make([]Link, len(dagnode.Links())),
567 568
	}

569
	for i, link := range dagnode.Links() {
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599
		output.Links[i] = Link{
			Name: link.Name,
			Size: link.Size,
		}
	}

	return output, nil
}

type progressReader struct {
	file         files.File
	out          chan interface{}
	bytes        int64
	lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
	n, err := i.file.Read(p)

	i.bytes += int64(n)
	if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
		i.lastProgress = i.bytes
		i.out <- &AddedObject{
			Name:  i.file.FileName(),
			Bytes: i.bytes,
		}
	}

	return n, err
}
600 601 602 603 604

type progressReader2 struct {
	*progressReader
	files.FileInfo
}