add.go 12.8 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1
package coreunix
2 3

import (
4
	"context"
5
	"fmt"
6
	"io"
7
	"io/ioutil"
8
	"os"
9
	gopath "path"
10
	"strconv"
Jeromy's avatar
Jeromy committed
11

Jeromy's avatar
Jeromy committed
12
	bs "github.com/ipfs/go-ipfs/blocks/blockstore"
13 14
	bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	bserv "github.com/ipfs/go-ipfs/blockservice"
Jeromy's avatar
Jeromy committed
15
	core "github.com/ipfs/go-ipfs/core"
16
	"github.com/ipfs/go-ipfs/exchange/offline"
17
	balanced "github.com/ipfs/go-ipfs/importer/balanced"
18
	"github.com/ipfs/go-ipfs/importer/chunk"
19 20
	ihelper "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
Jeromy's avatar
Jeromy committed
21
	dag "github.com/ipfs/go-ipfs/merkledag"
Jeromy's avatar
Jeromy committed
22
	mfs "github.com/ipfs/go-ipfs/mfs"
23
	"github.com/ipfs/go-ipfs/pin"
24
	posinfo "github.com/ipfs/go-ipfs/thirdparty/posinfo"
Jeromy's avatar
Jeromy committed
25 26
	unixfs "github.com/ipfs/go-ipfs/unixfs"

keks's avatar
keks committed
27
	files "gx/ipfs/QmNaA1HxkbVtweGfabDMy2DMLvqQ1eg3LNEqDMVA3zCoz1/go-ipfs-cmdkit/files"
Steven Allen's avatar
Steven Allen committed
28
	node "gx/ipfs/QmNwUEK7QbwSqyKBu3mMtToo8SUc6wQJ7gdZq4gGGJqfnf/go-ipld-format"
Jeromy's avatar
Jeromy committed
29
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
30 31
	ds "gx/ipfs/QmdHG8MAuARdGHxx4rPQASLcvhz24fzjSQq7AJRAQEorq5/go-datastore"
	syncds "gx/ipfs/QmdHG8MAuARdGHxx4rPQASLcvhz24fzjSQq7AJRAQEorq5/go-datastore/sync"
Steven Allen's avatar
Steven Allen committed
32
	cid "gx/ipfs/QmeSrf6pzut73u6zLQkRFQ3ygt3k6XFT2kjdYP8Tnkwwyg/go-cid"
33 34
)

Jeromy's avatar
Jeromy committed
35
var log = logging.Logger("coreunix")
36

37 38 39
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256

40 41
var liveCacheSize = uint64(256 << 10)

42 43 44 45 46 47 48 49
type Link struct {
	Name, Hash string
	Size       uint64
}

type Object struct {
	Hash  string
	Links []Link
50
	Size  string
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
}

type hiddenFileError struct {
	fileName string
}

func (e *hiddenFileError) Error() string {
	return fmt.Sprintf("%s is a hidden file", e.fileName)
}

type ignoreFileError struct {
	fileName string
}

func (e *ignoreFileError) Error() string {
	return fmt.Sprintf("%s is an ignored file", e.fileName)
}

type AddedObject struct {
	Name  string
	Hash  string `json:",omitempty"`
	Bytes int64  `json:",omitempty"`
73
	Size  string `json:",omitempty"`
74 75
}

76
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds dag.DAGService) (*Adder, error) {
77
	return &Adder{
78 79 80 81 82 83 84 85 86 87
		ctx:        ctx,
		pinning:    p,
		blockstore: bs,
		dagService: ds,
		Progress:   false,
		Hidden:     true,
		Pin:        true,
		Trickle:    false,
		Wrap:       false,
		Chunker:    "",
Jeromy's avatar
Jeromy committed
88
	}, nil
89 90
}

91
// Adder holds the switches passed to the `add` command.
92
type Adder struct {
93 94 95 96 97 98 99 100 101
	ctx        context.Context
	pinning    pin.Pinner
	blockstore bstore.GCBlockstore
	dagService dag.DAGService
	Out        chan interface{}
	Progress   bool
	Hidden     bool
	Pin        bool
	Trickle    bool
102
	RawLeaves  bool
103 104
	Silent     bool
	Wrap       bool
105
	NoCopy     bool
106
	Chunker    string
Jeromy's avatar
Jeromy committed
107
	root       node.Node
108
	mroot      *mfs.Root
109
	unlocker   bs.Unlocker
Jeromy's avatar
Jeromy committed
110
	tempRoot   *cid.Cid
111
	Prefix     *cid.Prefix
112
	liveNodes  uint64
113 114 115 116 117 118 119 120 121 122 123 124 125 126
}

func (adder *Adder) mfsRoot() (*mfs.Root, error) {
	if adder.mroot != nil {
		return adder.mroot, nil
	}
	rnode := unixfs.EmptyDirNode()
	rnode.SetPrefix(adder.Prefix)
	mr, err := mfs.NewRoot(adder.ctx, adder.dagService, rnode, nil)
	if err != nil {
		return nil, err
	}
	adder.mroot = mr
	return adder.mroot, nil
127 128
}

Jeromy's avatar
Jeromy committed
129
func (adder *Adder) SetMfsRoot(r *mfs.Root) {
130
	adder.mroot = r
Jeromy's avatar
Jeromy committed
131 132
}

133
// Constructs a node from reader's data, and adds it. Doesn't pin.
134
func (adder *Adder) add(reader io.Reader) (node.Node, error) {
Jeromy's avatar
Jeromy committed
135
	chnk, err := chunk.FromString(reader, adder.Chunker)
136 137 138
	if err != nil {
		return nil, err
	}
139

140 141 142 143
	params := ihelper.DagBuilderParams{
		Dagserv:   adder.dagService,
		RawLeaves: adder.RawLeaves,
		Maxlinks:  ihelper.DefaultLinksPerBlock,
144
		NoCopy:    adder.NoCopy,
145
		Prefix:    adder.Prefix,
146
	}
147

Jeromy's avatar
Jeromy committed
148
	if adder.Trickle {
149 150 151 152
		return trickle.TrickleLayout(params.New(chnk))
	}

	return balanced.BalancedLayout(params.New(chnk))
153 154
}

Jeromy's avatar
Jeromy committed
155
func (adder *Adder) RootNode() (node.Node, error) {
Jeromy's avatar
Jeromy committed
156
	// for memoizing
Jeromy's avatar
Jeromy committed
157 158
	if adder.root != nil {
		return adder.root, nil
Jeromy's avatar
Jeromy committed
159
	}
160

161 162 163 164 165
	mr, err := adder.mfsRoot()
	if err != nil {
		return nil, err
	}
	root, err := mr.GetValue().GetNode()
Jeromy's avatar
Jeromy committed
166 167 168
	if err != nil {
		return nil, err
	}
169

Jeromy's avatar
Jeromy committed
170
	// if not wrapping, AND one root file, use that hash as root.
171 172
	if !adder.Wrap && len(root.Links()) == 1 {
		nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService)
Jeromy's avatar
Jeromy committed
173 174
		if err != nil {
			return nil, err
Jeromy's avatar
Jeromy committed
175
		}
176

Jeromy's avatar
Jeromy committed
177
		root = nd
Jeromy's avatar
Jeromy committed
178
	}
Jeromy's avatar
Jeromy committed
179

Jeromy's avatar
Jeromy committed
180
	adder.root = root
Jeromy's avatar
Jeromy committed
181
	return root, err
182 183
}

Jeromy's avatar
Jeromy committed
184 185
func (adder *Adder) PinRoot() error {
	root, err := adder.RootNode()
186 187 188
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
189
	if !adder.Pin {
190 191
		return nil
	}
192

193
	rnk, err := adder.dagService.Add(root)
194 195 196 197
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
198
	if adder.tempRoot != nil {
199
		err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true)
Jeromy's avatar
Jeromy committed
200 201 202
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
203
		adder.tempRoot = rnk
Jeromy's avatar
Jeromy committed
204 205
	}

206 207
	adder.pinning.PinWithMode(rnk, pin.Recursive)
	return adder.pinning.Flush()
208 209
}

Jeromy's avatar
Jeromy committed
210
func (adder *Adder) Finalize() (node.Node, error) {
211 212 213 214 215
	mr, err := adder.mfsRoot()
	if err != nil {
		return nil, err
	}
	root := mr.GetValue()
Stephen Whitmore's avatar
Stephen Whitmore committed
216

217
	err = root.Flush()
Jeromy's avatar
Jeromy committed
218 219 220 221 222
	if err != nil {
		return nil, err
	}

	var name string
Jeromy's avatar
Jeromy committed
223
	if !adder.Wrap {
Jeromy's avatar
Jeromy committed
224
		children, err := root.(*mfs.Directory).ListNames(adder.ctx)
Jeromy's avatar
Jeromy committed
225 226 227
		if err != nil {
			return nil, err
		}
Quantomic's avatar
Quantomic committed
228 229 230 231 232

		if len(children) == 0 {
			return nil, fmt.Errorf("expected at least one child dir, got none")
		}

Jeromy's avatar
Jeromy committed
233
		name = children[0]
Stephen Whitmore's avatar
Stephen Whitmore committed
234

235 236 237 238 239 240
		mr, err := adder.mfsRoot()
		if err != nil {
			return nil, err
		}

		dir, ok := mr.GetValue().(*mfs.Directory)
241 242 243 244 245
		if !ok {
			return nil, fmt.Errorf("root is not a directory")
		}

		root, err = dir.Child(name)
Jeromy's avatar
Jeromy committed
246 247 248 249 250
		if err != nil {
			return nil, err
		}
	}

Jeromy's avatar
Jeromy committed
251
	err = adder.outputDirs(name, root)
Jeromy's avatar
Jeromy committed
252 253 254 255
	if err != nil {
		return nil, err
	}

256
	err = mr.Close()
257 258
	if err != nil {
		return nil, err
Stephen Whitmore's avatar
Stephen Whitmore committed
259 260
	}

Stephen Whitmore's avatar
Stephen Whitmore committed
261
	return root.GetNode()
Jeromy's avatar
Jeromy committed
262 263
}

Jeromy's avatar
Jeromy committed
264 265 266
func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
	switch fsn := fsn.(type) {
	case *mfs.File:
Jeromy's avatar
Jeromy committed
267
		return nil
Jeromy's avatar
Jeromy committed
268
	case *mfs.Directory:
Jeromy's avatar
Jeromy committed
269
		names, err := fsn.ListNames(adder.ctx)
270 271 272 273 274
		if err != nil {
			return err
		}

		for _, name := range names {
Jeromy's avatar
Jeromy committed
275 276 277 278 279 280 281 282 283 284
			child, err := fsn.Child(name)
			if err != nil {
				return err
			}

			childpath := gopath.Join(path, name)
			err = adder.outputDirs(childpath, child)
			if err != nil {
				return err
			}
285 286

			fsn.Uncache(name)
Jeromy's avatar
Jeromy committed
287
		}
Jeromy's avatar
Jeromy committed
288
		nd, err := fsn.GetNode()
Jeromy's avatar
Jeromy committed
289 290
		if err != nil {
			return err
Jeromy's avatar
Jeromy committed
291
		}
Jeromy's avatar
Jeromy committed
292

Jeromy's avatar
Jeromy committed
293 294 295 296
		return outputDagnode(adder.Out, path, nd)
	default:
		return fmt.Errorf("unrecognized fsn type: %#v", fsn)
	}
297 298
}

299 300 301
// Add builds a merkledag node from a reader, adds it to the blockstore,
// and returns the key representing that node.
// If you want to pin it, use NewAdder() and Adder.PinRoot().
302
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
Lars Gierth's avatar
Lars Gierth committed
303 304 305
	return AddWithContext(n.Context(), n, r)
}

306
// AddWithContext does the same as Add, but with a custom context.
Lars Gierth's avatar
Lars Gierth committed
307
func AddWithContext(ctx context.Context, n *core.IpfsNode, r io.Reader) (string, error) {
308
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
309

310
	fileAdder, err := NewAdder(ctx, n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
311 312 313
	if err != nil {
		return "", err
	}
314

315
	node, err := fileAdder.add(r)
316 317 318
	if err != nil {
		return "", err
	}
319

Jeromy's avatar
Jeromy committed
320
	return node.Cid().String(), nil
321
}
322 323 324

// AddR recursively adds files in |path|.
func AddR(n *core.IpfsNode, root string) (key string, err error) {
325
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
326

327
	stat, err := os.Lstat(root)
328 329 330
	if err != nil {
		return "", err
	}
331

Jeromy's avatar
Jeromy committed
332
	f, err := files.NewSerialFile(root, root, false, stat)
333 334 335
	if err != nil {
		return "", err
	}
336
	defer f.Close()
337

338
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
339 340 341
	if err != nil {
		return "", err
	}
342

Jeromy's avatar
Jeromy committed
343
	err = fileAdder.addFile(f)
344 345 346
	if err != nil {
		return "", err
	}
347

348
	nd, err := fileAdder.Finalize()
Jeromy's avatar
Jeromy committed
349 350 351 352
	if err != nil {
		return "", err
	}

Jeromy's avatar
Jeromy committed
353
	return nd.String(), nil
354 355
}

356 357
// AddWrapped adds data from a reader, and wraps it with a directory object
// to preserve the filename.
358 359
// Returns the path of the added file ("<dir hash>/filename"), the DAG node of
// the directory, and and error if any.
Jeromy's avatar
Jeromy committed
360
func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, node.Node, error) {
361
	file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil)
362
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
363 364 365
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
366
	fileAdder.Wrap = true
Jeromy's avatar
Jeromy committed
367

368
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
369

Jeromy's avatar
Jeromy committed
370
	err = fileAdder.addFile(file)
Jeromy's avatar
Jeromy committed
371 372 373 374
	if err != nil {
		return "", nil, err
	}

375
	dagnode, err := fileAdder.Finalize()
376 377 378
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
379

Jeromy's avatar
Jeromy committed
380 381
	c := dagnode.Cid()
	return gopath.Join(c.String(), filename), dagnode, nil
382 383
}

384
func (adder *Adder) addNode(node node.Node, path string) error {
385 386
	// patch it into the root
	if path == "" {
Jeromy's avatar
Jeromy committed
387
		path = node.Cid().String()
388
	}
389

390 391 392 393
	if pi, ok := node.(*posinfo.FilestoreNode); ok {
		node = pi.Node
	}

394 395 396 397
	mr, err := adder.mfsRoot()
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
398 399
	dir := gopath.Dir(path)
	if dir != "." {
400 401 402 403 404 405
		opts := mfs.MkdirOpts{
			Mkparents: true,
			Flush:     false,
			Prefix:    adder.Prefix,
		}
		if err := mfs.Mkdir(mr, dir, opts); err != nil {
Jeromy's avatar
Jeromy committed
406 407 408 409
			return err
		}
	}

410
	if err := mfs.PutNode(mr, path, node); err != nil {
411 412
		return err
	}
413

Jeromy's avatar
Jeromy committed
414
	if !adder.Silent {
415
		return outputDagnode(adder.Out, path, node)
Jeromy's avatar
Jeromy committed
416 417
	}
	return nil
418 419
}

420
// AddFile adds the given file while respecting the adder.
Jeromy's avatar
Jeromy committed
421
func (adder *Adder) AddFile(file files.File) error {
422 423 424
	if adder.Pin {
		adder.unlocker = adder.blockstore.PinLock()
	}
425
	defer func() {
426 427 428
		if adder.unlocker != nil {
			adder.unlocker.Unlock()
		}
429
	}()
Jeromy's avatar
Jeromy committed
430

Jeromy's avatar
Jeromy committed
431
	return adder.addFile(file)
Jeromy's avatar
Jeromy committed
432 433 434 435 436 437 438 439
}

func (adder *Adder) addFile(file files.File) error {
	err := adder.maybePauseForGC()
	if err != nil {
		return err
	}

440 441 442 443 444 445
	if adder.liveNodes >= liveCacheSize {
		// TODO: A smarter cache that uses some sort of lru cache with an eviction handler
		mr, err := adder.mfsRoot()
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
446
		if err := mr.FlushMemFree(adder.ctx); err != nil {
447 448
			return err
		}
Jeromy's avatar
Jeromy committed
449

450 451 452 453
		adder.liveNodes = 0
	}
	adder.liveNodes++

454
	if file.IsDirectory() {
Jeromy's avatar
Jeromy committed
455
		return adder.addDir(file)
456 457
	}

458 459 460 461
	// case for symlink
	if s, ok := file.(*files.Symlink); ok {
		sdata, err := unixfs.SymlinkData(s.Target)
		if err != nil {
Jeromy's avatar
Jeromy committed
462
			return err
463
		}
464

465
		dagnode := dag.NodeWithData(sdata)
466
		dagnode.SetPrefix(adder.Prefix)
467
		_, err = adder.dagService.Add(dagnode)
468
		if err != nil {
Jeromy's avatar
Jeromy committed
469
			return err
470 471
		}

Jeromy's avatar
Jeromy committed
472
		return adder.addNode(dagnode, s.FileName())
473 474 475 476 477 478
	}

	// case for regular file
	// if the progress flag was specified, wrap the file so that we can send
	// progress updates to the client (over the output channel)
	var reader io.Reader = file
Jeromy's avatar
Jeromy committed
479
	if adder.Progress {
480 481 482 483 484 485
		rdr := &progressReader{file: file, out: adder.Out}
		if fi, ok := file.(files.FileInfo); ok {
			reader = &progressReader2{rdr, fi}
		} else {
			reader = rdr
		}
486 487
	}

Jeromy's avatar
Jeromy committed
488
	dagnode, err := adder.add(reader)
489
	if err != nil {
Jeromy's avatar
Jeromy committed
490
		return err
491 492 493
	}

	// patch it into the root
Jeromy's avatar
Jeromy committed
494
	return adder.addNode(dagnode, file.FileName())
495 496
}

Jeromy's avatar
Jeromy committed
497
func (adder *Adder) addDir(dir files.File) error {
498
	log.Infof("adding directory: %s", dir.FileName())
499

500 501 502 503
	mr, err := adder.mfsRoot()
	if err != nil {
		return err
	}
504 505 506 507 508
	err = mfs.Mkdir(mr, dir.FileName(), mfs.MkdirOpts{
		Mkparents: true,
		Flush:     false,
		Prefix:    adder.Prefix,
	})
Jeromy's avatar
Jeromy committed
509 510 511 512
	if err != nil {
		return err
	}

513 514
	for {
		file, err := dir.NextFile()
515
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
516
			return err
517 518 519
		}
		if file == nil {
			break
520 521
		}

522 523 524
		// Skip hidden files when adding recursively, unless Hidden is enabled.
		if files.IsHidden(file) && !adder.Hidden {
			log.Infof("%s is hidden, skipping", file.FileName())
525
			continue
526 527 528
		}
		err = adder.addFile(file)
		if err != nil {
Jeromy's avatar
Jeromy committed
529
			return err
530 531 532
		}
	}

Jeromy's avatar
Jeromy committed
533
	return nil
534
}
535

Jeromy's avatar
Jeromy committed
536
func (adder *Adder) maybePauseForGC() error {
537
	if adder.unlocker != nil && adder.blockstore.GCRequested() {
Jeromy's avatar
Jeromy committed
538 539 540 541 542
		err := adder.PinRoot()
		if err != nil {
			return err
		}

543
		adder.unlocker.Unlock()
544
		adder.unlocker = adder.blockstore.PinLock()
Jeromy's avatar
Jeromy committed
545 546 547 548
	}
	return nil
}

549
// outputDagnode sends dagnode info over the output channel
550
func outputDagnode(out chan interface{}, name string, dn node.Node) error {
551 552 553 554 555 556 557 558 559 560 561 562
	if out == nil {
		return nil
	}

	o, err := getOutput(dn)
	if err != nil {
		return err
	}

	out <- &AddedObject{
		Hash: o.Hash,
		Name: name,
563
		Size: o.Size,
564 565 566 567 568
	}

	return nil
}

569
func NewMemoryDagService() dag.DAGService {
570 571 572
	// build mem-datastore for editor's intermediary nodes
	bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
	bsrv := bserv.New(bs, offline.Exchange(bs))
573
	return dag.NewDAGService(bsrv)
574 575 576
}

// from core/commands/object.go
577
func getOutput(dagnode node.Node) (*Object, error) {
Jeromy's avatar
Jeromy committed
578
	c := dagnode.Cid()
579 580 581 582
	s, err := dagnode.Size()
	if err != nil {
		return nil, err
	}
583 584

	output := &Object{
Jeromy's avatar
Jeromy committed
585
		Hash:  c.String(),
586
		Size:  strconv.FormatUint(s, 10),
587
		Links: make([]Link, len(dagnode.Links())),
588 589
	}

590
	for i, link := range dagnode.Links() {
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620
		output.Links[i] = Link{
			Name: link.Name,
			Size: link.Size,
		}
	}

	return output, nil
}

type progressReader struct {
	file         files.File
	out          chan interface{}
	bytes        int64
	lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
	n, err := i.file.Read(p)

	i.bytes += int64(n)
	if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
		i.lastProgress = i.bytes
		i.out <- &AddedObject{
			Name:  i.file.FileName(),
			Bytes: i.bytes,
		}
	}

	return n, err
}
621 622 623 624 625

type progressReader2 struct {
	*progressReader
	files.FileInfo
}